Raspberry Pi/Python/bluepy + ESP32
With bluepy installed, this exercise implement BLE client side on Raspberry Pi using Python, connect to ESP32 BLE uart server, send and receive data in between.
Prepare ESP32 BLE_uart
ESP32 side (NodeMCU ESP-32S) is programmed in Arduino framework.
In Arduino IDE
- Open Examples > ESP32 BLE Arduino > BLE_uart in
Arduino IDE, and upload to ESP32 board.
It's a BLE setup as server, wait connection. Once connected, send data to client repeatedly and display received data to Serial Monitor.
Test with nRF Connect App
To verify the function of ESP32 BLE_art on Android device, install nRF Connect for Mobile App by Nordic Semiconductor ASA.
Simple test Python code -
Automatically connect to BLE_uart, send and receive data repeatedly.
ex_bluepy_uart.py
from bluepy import btle
import time
class MyDelegate(btle.DefaultDelegate):
def __init__(self):
btle.DefaultDelegate.__init__(self)
# ... initialise here
def handleNotification(self, cHandle, data):
#print("\n- handleNotification -\n")
print(data)
# ... perhaps check cHandle
# ... process 'data'
# Initialisation -------
p = btle.Peripheral("3c:71:bf:0d:dd:6a") #NodeMCU-32S
#p = btle.Peripheral("24:0a:c4:e8:0f:9a") #ESP32-DevKitC V4
# Setup to turn notifications on, e.g.
svc = p.getServiceByUUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
ch_Tx = svc.getCharacteristics("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
ch_Rx = svc.getCharacteristics("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
p.setDelegate( MyDelegate())
setup_data = b"\x01\00"
p.writeCharacteristic(ch_Rx.valHandle+1, setup_data)
lasttime = time.localtime()
while True:
"""
if p.waitForNotifications(1.0):
pass #continue
print("Waiting...")
"""
nowtime = time.localtime()
if(nowtime > lasttime):
lasttime = nowtime
stringtime = time.strftime("%H:%M:%S", nowtime)
btime = bytes(stringtime, 'utf-8')
try:
ch_Tx.write(btime, True)
except btle.BTLEException:
print("btle.BTLEException");
#print(stringtime)
#ch_Tx.write(b'wait...', True)
# Perhaps do something else here
Python code with PyQt5 GUI -
import sys
import time
import requests
from PyQt5.QtCore import QObject, QRunnable, QThreadPool, QTimer, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import (
QApplication, QLabel, QMainWindow, QPlainTextEdit, QPushButton, QVBoxLayout, QWidget,
)
from bluepy import btle
class WorkerSignals(QObject):
signalMsg = pyqtSignal(str)
signalRes = pyqtSignal(str)
class MyDelegate(btle.DefaultDelegate):
def __init__(self, sgn):
btle.DefaultDelegate.__init__(self)
self.sgn = sgn
def handleNotification(self, cHandle, data):
try:
dataDecoded = data.decode()
self.sgn.signalRes.emit(dataDecoded)
except UnicodeError:
print("UnicodeError: ", data)
class WorkerBLE(QRunnable):
def __init__(self):
super().__init__()
self.signals = WorkerSignals()
self.rqsToSend = False
@pyqtSlot()
def run(self):
self.signals.signalMsg.emit("WorkerBLE start")
#---------------------------------------------
p = btle.Peripheral("3c:71:bf:0d:dd:6a")
p.setDelegate( MyDelegate(self.signals) )
svc = p.getServiceByUUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
self.ch_Tx = svc.getCharacteristics("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
ch_Rx = svc.getCharacteristics("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
setup_data = b"\x01\00"
p.writeCharacteristic(ch_Rx.valHandle+1, setup_data)
# BLE loop --------
while True:
"""
if p.waitForNotifications(1.0):
# handleNotification() was called
continue
print("Waiting...")
"""
p.waitForNotifications(1.0)
if self.rqsToSend:
self.rqsToSend = False
try:
self.ch_Tx.write(self.bytestosend, True)
except btle.BTLEException:
print("btle.BTLEException");
#---------------------------------------------hellohello
self.signals.signalMsg.emit("WorkerBLE end")
def toSendBLE(self, tosend):
self.bytestosend = bytes(tosend, 'utf-8')
self.rqsToSend = True
"""
try:
self.ch_Tx.write(bytestosend, True)
except BTLEException:
print("BTLEException");
"""
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
layout = QVBoxLayout()
buttonStartBLE = QPushButton("Start BLE")
buttonStartBLE.pressed.connect(self.startBLE)
self.console = QPlainTextEdit()
self.console.setReadOnly(True)
self.outconsole = QPlainTextEdit()
buttonSendBLE = QPushButton("Send message")
buttonSendBLE.pressed.connect(self.sendBLE)
layout.addWidget(buttonStartBLE)
layout.addWidget(self.console)
layout.addWidget(self.outconsole)
layout.addWidget(buttonSendBLE)
w = QWidget()
w.setLayout(layout)
self.setCentralWidget(w)
self.show()
self.threadpool = QThreadPool()
print(
"Multithreading with Maximum %d threads" % self.threadpool.maxThreadCount())
def startBLE(self):
self.workerBLE = WorkerBLE()
self.workerBLE.signals.signalMsg.connect(self.slotMsg)
self.workerBLE.signals.signalRes.connect(self.slotRes)
self.threadpool.start(self.workerBLE)
def sendBLE(self):
strToSend = self.outconsole.toPlainText()
self.workerBLE.toSendBLE(strToSend)
def slotMsg(self, msg):
print(msg)
def slotRes(self, res):
self.console.appendPlainText(res)
app = QApplication(sys.argv)
window = MainWindow()
app.exec()
Raspberry Pi/Python/bluepy + Arduino Nano RP2040 Connect
/*
* BLE_peripheral_uart:
* modifid from Examples > ArduinoBLE > Peripheral > CallbackLED
*
* Bi-direction BLE communication between Raspberry Pi/Python (with PyQt5 GUI)
* and ESP32/Arduino Naon RP2040 Connect
* http://helloraspberrypi.blogspot.com/2021/08/bi-direction-ble-communication-between.html
*
*/
#include <ArduinoBLE.h>
#define SERVICE_UUID "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" // UART service UUID
#define CHARACTERISTIC_UUID_RX "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
#define CHARACTERISTIC_UUID_TX "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
//BLEService ledService("19B10000-E8F2-537E-4F6C-D104768A1214"); // create service
BLEService uartService(SERVICE_UUID); // create service
// create switch characteristic and allow remote device to read and write
//BLEByteCharacteristic switchCharacteristic("19B10001-E8F2-537E-4F6C-D104768A1214", BLERead | BLEWrite);
BLEStringCharacteristic rxCharacteristic(CHARACTERISTIC_UUID_RX, BLEWrite, 30);
BLEStringCharacteristic txCharacteristic(CHARACTERISTIC_UUID_TX, BLENotify, 30);
// const int ledPin = LED_BUILTIN; // pin to use for the LED
void setup() {
Serial.begin(115200);
while (!Serial);
// pinMode(ledPin, OUTPUT); // use the LED pin as an output
// begin initialization
if (!BLE.begin()) {
Serial.println("starting BLE failed!");
while (1);
}
// set the local name peripheral advertises
BLE.setLocalName("BLE_peripheral_uart");
// set the UUID for the service this peripheral advertises
BLE.setAdvertisedService(uartService);
// add the characteristic to the service
uartService.addCharacteristic(rxCharacteristic);
uartService.addCharacteristic(txCharacteristic);
// add service
BLE.addService(uartService);
// assign event handlers for connected, disconnected to peripheral
BLE.setEventHandler(BLEConnected, blePeripheralConnectHandler);
BLE.setEventHandler(BLEDisconnected, blePeripheralDisconnectHandler);
// assign event handlers for characteristic
rxCharacteristic.setEventHandler(BLEWritten, rxCharacteristicWritten);
// set an initial value for the characteristic
rxCharacteristic.setValue("BLE_peripheral_uart");
// start advertising
BLE.advertise();
Serial.println(("Bluetooth device active, waiting for connections..."));
Serial.println(BLE.address());
}
void loop() {
// poll for BLE events
BLE.poll();
}
void blePeripheralConnectHandler(BLEDevice central) {
// central connected event handler
Serial.print("Connected event, central: ");
Serial.println(central.address());
}
void blePeripheralDisconnectHandler(BLEDevice central) {
// central disconnected event handler
Serial.print("Disconnected event, central: ");
Serial.println(central.address());
}
void rxCharacteristicWritten(BLEDevice central, BLECharacteristic characteristic) {
// central wrote new value to characteristic, update LED
Serial.print("Characteristic event, written: ");
Serial.println("len=" +
String(rxCharacteristic.valueLength()));
String valString = rxCharacteristic.value();
Serial.println(valString);
valString.toUpperCase();
Serial.println(valString);
txCharacteristic.setValue(valString);
}