Wednesday, February 24, 2021

TCP socket communication between (RPi Pico+ESP-01S) and ESP32 via WiFi

In my previous MicroPython exercises:
Pico/MicroPython + ESP-01S (AT Command) act as TCP Client connect to Raspberry Pi/Python TCP Server: Python code run on Raspberry Pi act as server. RPi Pico+ESP-01S act as client, connect and send data to server. The server once receive data, convert to upper case and send back to client.
ESP32/MicroPython exercise: act as Access Point, and setup a simple web server

In this exercise,
- the ESP32 web server code is modified to replace the role of Raspberry Pi/Python Server. Setup as WiFi Access Point, run a socket server on port 9999, wait connection from client, receive data, convert to upper case and echo back.
- RPi Pico+ESP-01S join the ESP32 WiFi network, connect to 192.168.4.1 (the default IP of ESP32 SoftAP), port 9999, send data and wait response.

Both coded in MicroPython.


In my practice as shown in the video, a Raspberry Pi 4B is used as development host for both ESP32 and Pico using MicroPython. Firstly, in Thonny, save server code (upyESP32_AP_EchoSvr_20210224a.py) on ESP32, name main.py. Switch Thonny interpreter for Pico and port. Then open Putty as serial terminal connect to ESP32 as REPL to monitor the output from ESP32. Finally, run the client code (mpyPico_ESP-01S_TCPclient_20210224a.py) in Thonny.

upyESP32_AP_EchoSvr_20210224a.py, run on ESP32 as server.
import uos
import network
import usocket
"""
ESP32/MicroPython exercise:
ESP32 act as Access Point,
and setup a simple TCP echo server

ref:
MicroPython usocket – socket module
https://docs.micropython.org/en/latest/library/usocket.html
"""

ssid= "ESP32-ssid"
password="password"

print("----- MicroPython -----")
for u in uos.uname():
    print(u)
print("-----------------------")

ap = network.WLAN(network.AP_IF) # Access Point
ap.config(essid=ssid,
          password=password,
          authmode=network.AUTH_WPA_WPA2_PSK) 
ap.config(max_clients=1)  # max number of client
ap.active(True)           # activate the access point

print(ap.ifconfig())
print(dir(ap))

mysocket = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
mysocket.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)

mysocket.bind(('', 9999))
mysocket.listen(1)

while True:
  conn, addr = mysocket.accept()
  print('Connected from: %s' % str(addr))
  print()
  request = conn.recv(1024)
  print('request: %s' % str(request))
  print()
  conn.send(request.upper())
  conn.send('\r\n')
  conn.close()

mpyPico_ESP-01S_TCPclient_20210224a.py, run on Raspberry Pi Pico, as client.
import uos
import machine
import utime
"""
Raspberry Pi Pico/MicroPython + ESP-01S exercise

ESP-01S(ESP8266) with AT-command firmware:
AT version:1.7.4.0(May 11 2020 19:13:04)

Pico send AT command to ESP-01S via UART,
- set in station mode
- join AP
- connect to server ip:port 9999
- send text and wait response
"""
#server port & ip hard-coded,
#have to match with server side setting
server_ip="192.168.4.1"
server_port=9999

print()
print("Machine: \t" + uos.uname()[4])
print("MicroPython: \t" + uos.uname()[3])

#indicate program started visually
led_onboard = machine.Pin(25, machine.Pin.OUT)
led_onboard.value(0)     # onboard LED OFF/ON for 0.5/1.0 sec
utime.sleep(0.5)
led_onboard.value(1)
utime.sleep(1.0)
led_onboard.value(0)

uart0 = machine.UART(0, baudrate=115200)
print(uart0)

def sendCMD_waitResp(cmd, uart=uart0, timeout=2000):
    print("CMD: " + cmd)
    uart.write(cmd)
    waitResp(uart, timeout)
    print()
    
def waitResp(uart=uart0, timeout=2000):
    prvMills = utime.ticks_ms()
    resp = b""
    while (utime.ticks_ms()-prvMills)<timeout:
        if uart.any():
            resp = b"".join([resp, uart.read(1)])
    print("resp:")
    try:
        print(resp.decode())
    except UnicodeError:
        print(resp)
        
# send CMD to uart,
# wait and show response without return
def sendCMD_waitAndShow(cmd, uart=uart0):
    print("CMD: " + cmd)
    uart.write(cmd)
    while True:
        print(uart.readline())
        
def espSend(text="test", uart=uart0):
    sendCMD_waitResp('AT+CIPSEND=' + str(len(text)) + '\r\n')
    sendCMD_waitResp(text)
        
def XmonitorESP(uart=uart0):
    """
    while True:
        line=uart.readline()
        try:
            print(line.decode())
        except UnicodeError:
            print(line)
    """
    while True:
        if uart.any():
            print(uart.read(1))
    
sendCMD_waitResp('AT\r\n')          #Test AT startup
sendCMD_waitResp('AT+GMR\r\n')      #Check version information
#sendCMD_waitResp('AT+RESTORE\r\n')  #Restore Factory Default Settings

sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode
sendCMD_waitResp('AT+CWMODE=1\r\n') #Set the Wi-Fi mode 1 = Station mode
#sendCMD_waitResp('AT+CWMODE=2\r\n') #Set the Wi-Fi mode 2 = S0ftAP mode
sendCMD_waitResp('AT+CWMODE?\r\n')  #Query the Wi-Fi mode again

#sendCMD_waitResp('AT+CWLAP\r\n', timeout=10000) #List available APs
sendCMD_waitResp('AT+CWJAP="ESP32-ssid","password"\r\n', timeout=5000) #Connect to AP
sendCMD_waitResp('AT+CIFSR\r\n')    #Obtain the Local IP Address
#sendCMD_waitResp('AT+CIPSTART="TCP","192.168.12.147",9999\r\n')
sendCMD_waitResp('AT+CIPSTART="TCP","' +
                 server_ip +
                 '",' +
                 str(server_port) +
                 '\r\n')
espSend()

while True:
    print('Enter something:')
    msg = input()
    #sendCMD_waitResp('AT+CIPSTART="TCP","192.168.12.147",9999\r\n')
    sendCMD_waitResp('AT+CIPSTART="TCP","' +
                 server_ip +
                 '",' +
                 str(server_port) +
                 '\r\n')
    espSend(msg)

Next:


No comments: