Wednesday, August 25, 2021

Bi-direction BLE communication between Raspberry Pi/Python (with PyQt5 GUI) and ESP32/Arduino Nano RP2040 Connect

Raspberry Pi/Python/bluepy + ESP32

With bluepy installed, this exercise implement BLE client side on Raspberry Pi using Python, connect to ESP32 BLE uart server, send and receive data in between.


Prepare ESP32 BLE_uart

ESP32 side (NodeMCU ESP-32S) is programmed in Arduino framework. 

In Arduino IDE
- Open Examples > ESP32 BLE Arduino > BLE_uart in Arduino IDE, and upload to ESP32 board.

It's a BLE setup as server, wait connection. Once connected, send data to client repeatedly and display received data to Serial Monitor.

Test with nRF Connect App 

To verify the function of ESP32 BLE_art on Android device, install nRF Connect for Mobile App by Nordic Semiconductor ASA.

Simple test Python code -

Automatically connect to BLE_uart, send and receive data repeatedly.

 ex_bluepy_uart.py

from bluepy import btle
import time

class MyDelegate(btle.DefaultDelegate):
    def __init__(self):
        btle.DefaultDelegate.__init__(self)
        # ... initialise here

    def handleNotification(self, cHandle, data):
        #print("\n- handleNotification -\n")
        print(data)
        # ... perhaps check cHandle
        # ... process 'data'

# Initialisation  -------

p = btle.Peripheral("3c:71:bf:0d:dd:6a")   #NodeMCU-32S
#p = btle.Peripheral("24:0a:c4:e8:0f:9a")   #ESP32-DevKitC V4

# Setup to turn notifications on, e.g.
svc = p.getServiceByUUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
ch_Tx = svc.getCharacteristics("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
ch_Rx = svc.getCharacteristics("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]

p.setDelegate( MyDelegate())

setup_data = b"\x01\00"
p.writeCharacteristic(ch_Rx.valHandle+1, setup_data)

lasttime = time.localtime()

while True:
    """
    if p.waitForNotifications(1.0):
        pass  #continue

    print("Waiting...")
    """
    
    nowtime = time.localtime()
    if(nowtime > lasttime):
        lasttime = nowtime
        stringtime = time.strftime("%H:%M:%S", nowtime)
        btime = bytes(stringtime, 'utf-8')
        try:
            ch_Tx.write(btime, True)
        except btle.BTLEException:
            print("btle.BTLEException");
        #print(stringtime)
        #ch_Tx.write(b'wait...', True)
        
    # Perhaps do something else here
Python code with PyQt5 GUI -
- Click "Start BLE" button to connect to BLE_uart.
- Display received data on QPlainTextEdit, and send user entered data to BLE



pyqt5_bluepy_thread_.py
import sys
import time

import requests
from PyQt5.QtCore import QObject, QRunnable, QThreadPool, QTimer, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import (
    QApplication, QLabel, QMainWindow,  QPlainTextEdit, QPushButton, QVBoxLayout, QWidget,
    )

from bluepy import btle

class WorkerSignals(QObject):
    signalMsg = pyqtSignal(str)
    signalRes = pyqtSignal(str)
    
class MyDelegate(btle.DefaultDelegate):
    
    def __init__(self, sgn):
        btle.DefaultDelegate.__init__(self)
        self.sgn = sgn

    def handleNotification(self, cHandle, data):
        
        try:
            dataDecoded = data.decode()
            self.sgn.signalRes.emit(dataDecoded)
        except UnicodeError:
            print("UnicodeError: ", data)

class WorkerBLE(QRunnable):
    
    def __init__(self):
        super().__init__()
        self.signals = WorkerSignals()
        self.rqsToSend = False
        
    @pyqtSlot()
    def run(self):
        self.signals.signalMsg.emit("WorkerBLE start")
        
        #---------------------------------------------
        p = btle.Peripheral("3c:71:bf:0d:dd:6a")
        p.setDelegate( MyDelegate(self.signals) )

        svc = p.getServiceByUUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
        self.ch_Tx = svc.getCharacteristics("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        ch_Rx = svc.getCharacteristics("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]

        setup_data = b"\x01\00"
        p.writeCharacteristic(ch_Rx.valHandle+1, setup_data)

        # BLE loop --------

        while True:
            """
            if p.waitForNotifications(1.0):
                # handleNotification() was called
                continue

            print("Waiting...")
            """
            
            p.waitForNotifications(1.0)
            
            if self.rqsToSend:
                self.rqsToSend = False

                try:
                    self.ch_Tx.write(self.bytestosend, True)
                except btle.BTLEException:
                    print("btle.BTLEException");
            
        #---------------------------------------------hellohello
        self.signals.signalMsg.emit("WorkerBLE end")
        
    def toSendBLE(self, tosend):
        self.bytestosend = bytes(tosend, 'utf-8')
        self.rqsToSend = True
        """
        try:
            self.ch_Tx.write(bytestosend, True)
        except BTLEException:
            print("BTLEException");
        """
            
class MainWindow(QMainWindow):
    
    def __init__(self):
        super().__init__()
        
        layout = QVBoxLayout()
        
        buttonStartBLE = QPushButton("Start BLE")
        buttonStartBLE.pressed.connect(self.startBLE)
        
        self.console = QPlainTextEdit()
        self.console.setReadOnly(True)
        
        self.outconsole = QPlainTextEdit()
        
        buttonSendBLE = QPushButton("Send message")
        buttonSendBLE.pressed.connect(self.sendBLE)

        layout.addWidget(buttonStartBLE)
        layout.addWidget(self.console)
        layout.addWidget(self.outconsole)
        layout.addWidget(buttonSendBLE)
        
        w = QWidget()
        w.setLayout(layout)
        
        self.setCentralWidget(w)
        
        self.show()
        self.threadpool = QThreadPool()
        print(
            "Multithreading with Maximum %d threads" % self.threadpool.maxThreadCount())
            
    def startBLE(self):
        self.workerBLE = WorkerBLE()
        self.workerBLE.signals.signalMsg.connect(self.slotMsg)
        self.workerBLE.signals.signalRes.connect(self.slotRes)
        self.threadpool.start(self.workerBLE)
        
    def sendBLE(self):
        strToSend = self.outconsole.toPlainText()
        self.workerBLE.toSendBLE(strToSend)
        
    def slotMsg(self, msg):
        print(msg)
        
    def slotRes(self, res):
        self.console.appendPlainText(res)
        
app = QApplication(sys.argv)
window = MainWindow()
app.exec()



Raspberry Pi/Python/bluepy + Arduino Nano RP2040 Connect

Now, replace ESP32 with Arduino Nano RP2040 Connect, running following code using ArduinoBLE library. Please  note that you have to modify Python code to match with Arduino Nano RP2040 Connect's MAC address.


BLE_peripheral_uart.ino
/*
 * BLE_peripheral_uart:
 * modifid from Examples > ArduinoBLE > Peripheral > CallbackLED
 * 
 * Bi-direction BLE communication between Raspberry Pi/Python (with PyQt5 GUI) 
 * and ESP32/Arduino Naon RP2040 Connect
 * http://helloraspberrypi.blogspot.com/2021/08/bi-direction-ble-communication-between.html
 * 
*/

#include <ArduinoBLE.h>

#define SERVICE_UUID           "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" // UART service UUID
#define CHARACTERISTIC_UUID_RX "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
#define CHARACTERISTIC_UUID_TX "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"

//BLEService ledService("19B10000-E8F2-537E-4F6C-D104768A1214"); // create service
BLEService uartService(SERVICE_UUID); // create service

// create switch characteristic and allow remote device to read and write
//BLEByteCharacteristic switchCharacteristic("19B10001-E8F2-537E-4F6C-D104768A1214", BLERead | BLEWrite);

BLEStringCharacteristic rxCharacteristic(CHARACTERISTIC_UUID_RX, BLEWrite, 30);
BLEStringCharacteristic txCharacteristic(CHARACTERISTIC_UUID_TX, BLENotify, 30);

// const int ledPin = LED_BUILTIN; // pin to use for the LED

void setup() {
  Serial.begin(115200);
  while (!Serial);
  
  // pinMode(ledPin, OUTPUT); // use the LED pin as an output

  // begin initialization
  if (!BLE.begin()) {
    Serial.println("starting BLE failed!");

    while (1);
  }
  
  // set the local name peripheral advertises
  BLE.setLocalName("BLE_peripheral_uart");
  // set the UUID for the service this peripheral advertises
  BLE.setAdvertisedService(uartService);

  // add the characteristic to the service
  uartService.addCharacteristic(rxCharacteristic);
  uartService.addCharacteristic(txCharacteristic);

  // add service
  BLE.addService(uartService);

  // assign event handlers for connected, disconnected to peripheral
  BLE.setEventHandler(BLEConnected, blePeripheralConnectHandler);
  BLE.setEventHandler(BLEDisconnected, blePeripheralDisconnectHandler);

  // assign event handlers for characteristic
  rxCharacteristic.setEventHandler(BLEWritten, rxCharacteristicWritten);
  // set an initial value for the characteristic
  rxCharacteristic.setValue("BLE_peripheral_uart");

  // start advertising
  BLE.advertise();

  Serial.println(("Bluetooth device active, waiting for connections..."));
  Serial.println(BLE.address());
}

void loop() {
  // poll for BLE events
  BLE.poll();
}

void blePeripheralConnectHandler(BLEDevice central) {
  // central connected event handler
  Serial.print("Connected event, central: ");
  Serial.println(central.address());
}

void blePeripheralDisconnectHandler(BLEDevice central) {
  // central disconnected event handler
  Serial.print("Disconnected event, central: ");
  Serial.println(central.address());
}

void rxCharacteristicWritten(BLEDevice central, BLECharacteristic characteristic) {
  // central wrote new value to characteristic, update LED
  Serial.print("Characteristic event, written: ");

  Serial.println("len=" + 
                  String(rxCharacteristic.valueLength()));
  String valString = rxCharacteristic.value();
  Serial.println(valString);
  valString.toUpperCase();
  Serial.println(valString);
  txCharacteristic.setValue(valString);
}



No comments: