esp_prov: switch from bluez/dbus to bleak

to enable multiplatform ble compatibility
This commit is contained in:
Cody Cutrer 2022-06-07 10:16:23 -06:00 committed by Laukik Hase
parent aa786b34c2
commit 069b82401c
No known key found for this signature in database
GPG Key ID: 11C571361F51A199
11 changed files with 220 additions and 484 deletions

View File

@ -2199,11 +2199,6 @@ tools/esp_prov/security/security.py
tools/esp_prov/security/security0.py
tools/esp_prov/security/security1.py
tools/esp_prov/transport/__init__.py
tools/esp_prov/transport/ble_cli.py
tools/esp_prov/transport/transport.py
tools/esp_prov/transport/transport_ble.py
tools/esp_prov/transport/transport_console.py
tools/esp_prov/transport/transport_http.py
tools/esp_prov/utils/__init__.py
tools/find_apps.py
tools/find_build_apps/__init__.py

View File

@ -86,15 +86,12 @@ Usage of `esp-prov` assumes that the provisioning app has specific protocomm end
# AVAILABILITY
`esp_prov` is intended as a cross-platform tool, but currently BLE communication functionality is only available on Linux (via BlueZ and DBus)
For Android, a provisioning tool along with source code is available [here](https://github.com/espressif/esp-idf-provisioning-android)
On macOS and Windows, running with `--transport ble` option falls back to console mode, ie. write data and target UUID are printed to STDOUT and read data is input through STDIN. Users are free to use their app of choice to connect to the BLE device, send the write data to the target characteristic and read from it.
## Dependencies
This requires the following python libraries to run (included in requirements.txt):
* `bleak`
* `future`
* `protobuf`
* `cryptography`
@ -103,14 +100,6 @@ Run `pip install -r $IDF_PATH/tools/esp_prov/requirements.txt`
Note :
* The packages listed in requirements.txt are limited only to the ones needed AFTER fully satisfying the requirements of ESP-IDF
* BLE communication is only supported on Linux (via Bluez and DBus), therefore, the dependencies for this have been made optional
## Optional Dependencies (Linux Only)
These dependencies are for enabling communication with BLE devices using Bluez and DBus on Linux:
* `dbus-python`
Run `pip install -r $IDF_PATH/tools/esp_prov/requirements_linux_extra.txt`
# EXAMPLE USAGE

View File

@ -7,6 +7,7 @@
from __future__ import print_function
import argparse
import asyncio
import json
import os
import sys
@ -50,7 +51,7 @@ def get_security(secver, username, password, pop='', verbose=False):
return None
def get_transport(sel_transport, service_name):
async def get_transport(sel_transport, service_name):
try:
tp = None
if (sel_transport == 'softap'):
@ -68,9 +69,9 @@ def get_transport(sel_transport, service_name):
# in which case, the automated discovery will fail and the client
# will fallback to using the provided UUIDs instead
nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'}
tp = transport.Transport_BLE(devname=service_name,
service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4',
tp = transport.Transport_BLE(service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4',
nu_lookup=nu_lookup)
await tp.connect(devname=service_name)
elif (sel_transport == 'console'):
tp = transport.Transport_Console()
return tp
@ -79,9 +80,9 @@ def get_transport(sel_transport, service_name):
return None
def version_match(tp, protover, verbose=False):
async def version_match(tp, protover, verbose=False):
try:
response = tp.send_data('proto-ver', protover)
response = await tp.send_data('proto-ver', protover)
if verbose:
print('proto-ver response : ', response)
@ -108,11 +109,11 @@ def version_match(tp, protover, verbose=False):
return None
def has_capability(tp, capability='none', verbose=False):
async def has_capability(tp, capability='none', verbose=False):
# Note : default value of `capability` argument cannot be empty string
# because protocomm_httpd expects non zero content lengths
try:
response = tp.send_data('proto-ver', capability)
response = await tp.send_data('proto-ver', capability)
if verbose:
print('proto-ver response : ', response)
@ -142,24 +143,24 @@ def has_capability(tp, capability='none', verbose=False):
return False
def get_version(tp):
async def get_version(tp):
response = None
try:
response = tp.send_data('proto-ver', '---')
response = await tp.send_data('proto-ver', '---')
except RuntimeError as e:
on_except(e)
response = ''
return response
def establish_session(tp, sec):
async def establish_session(tp, sec):
try:
response = None
while True:
request = sec.security_session(response)
if request is None:
break
response = tp.send_data('prov-session', request)
response = await tp.send_data('prov-session', request)
if (response is None):
return False
return True
@ -168,27 +169,27 @@ def establish_session(tp, sec):
return None
def custom_config(tp, sec, custom_info, custom_ver):
async def custom_config(tp, sec, custom_info, custom_ver):
try:
message = prov.custom_config_request(sec, custom_info, custom_ver)
response = tp.send_data('custom-config', message)
response = await tp.send_data('custom-config', message)
return (prov.custom_config_response(sec, response) == 0)
except RuntimeError as e:
on_except(e)
return None
def custom_data(tp, sec, custom_data):
async def custom_data(tp, sec, custom_data):
try:
message = prov.custom_data_request(sec, custom_data)
response = tp.send_data('custom-data', message)
response = await tp.send_data('custom-data', message)
return (prov.custom_data_response(sec, response) == 0)
except RuntimeError as e:
on_except(e)
return None
def scan_wifi_APs(sel_transport, tp, sec):
async def scan_wifi_APs(sel_transport, tp, sec):
APs = []
group_channels = 0
readlen = 100
@ -211,13 +212,13 @@ def scan_wifi_APs(sel_transport, tp, sec):
try:
message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels)
start_time = time.time()
response = tp.send_data('prov-scan', message)
response = await tp.send_data('prov-scan', message)
stop_time = time.time()
print('++++ Scan process executed in ' + str(stop_time - start_time) + ' sec')
prov.scan_start_response(sec, response)
message = prov.scan_status_request(sec)
response = tp.send_data('prov-scan', message)
response = await tp.send_data('prov-scan', message)
result = prov.scan_status_response(sec, response)
print('++++ Scan results : ' + str(result['count']))
if result['count'] != 0:
@ -226,7 +227,7 @@ def scan_wifi_APs(sel_transport, tp, sec):
while remaining:
count = [remaining, readlen][remaining > readlen]
message = prov.scan_result_request(sec, index, count)
response = tp.send_data('prov-scan', message)
response = await tp.send_data('prov-scan', message)
APs += prov.scan_result_response(sec, response)
remaining -= count
index += count
@ -238,37 +239,37 @@ def scan_wifi_APs(sel_transport, tp, sec):
return APs
def send_wifi_config(tp, sec, ssid, passphrase):
async def send_wifi_config(tp, sec, ssid, passphrase):
try:
message = prov.config_set_config_request(sec, ssid, passphrase)
response = tp.send_data('prov-config', message)
response = await tp.send_data('prov-config', message)
return (prov.config_set_config_response(sec, response) == 0)
except RuntimeError as e:
on_except(e)
return None
def apply_wifi_config(tp, sec):
async def apply_wifi_config(tp, sec):
try:
message = prov.config_apply_config_request(sec)
response = tp.send_data('prov-config', message)
response = await tp.send_data('prov-config', message)
return (prov.config_apply_config_response(sec, response) == 0)
except RuntimeError as e:
on_except(e)
return None
def get_wifi_config(tp, sec):
async def get_wifi_config(tp, sec):
try:
message = prov.config_get_status_request(sec)
response = tp.send_data('prov-config', message)
response = await tp.send_data('prov-config', message)
return prov.config_get_status_response(sec, response)
except RuntimeError as e:
on_except(e)
return None
def wait_wifi_connected(tp, sec):
async def wait_wifi_connected(tp, sec):
"""
Wait for provisioning to report Wi-Fi is connected
@ -280,7 +281,7 @@ def wait_wifi_connected(tp, sec):
while True:
time.sleep(TIME_PER_POLL)
print('\n==== Wi-Fi connection state ====')
ret = get_wifi_config(tp, sec)
ret = await get_wifi_config(tp, sec)
if ret == 'connecting':
continue
elif ret == 'connected':
@ -301,7 +302,7 @@ def desc_format(*args):
return desc
if __name__ == '__main__':
async def main():
parser = argparse.ArgumentParser(description=desc_format(
'ESP Provisioning tool for configuring devices '
'running protocomm based provisioning service.',
@ -388,110 +389,116 @@ if __name__ == '__main__':
security.sec2_gen_salt_verifier(args.sec2_usr, args.sec2_pwd, args.sec2_salt_len)
exit(0)
obj_transport = get_transport(args.mode.lower(), args.name)
obj_transport = await get_transport(args.mode.lower(), args.name)
if obj_transport is None:
print('---- Failed to establish connection ----')
exit(1)
# If security version not specified check in capabilities
if args.secver is None:
# First check if capabilities are supported or not
if not has_capability(obj_transport):
print('Security capabilities could not be determined. Please specify "--sec_ver" explicitly')
try:
# If security version not specified check in capabilities
if args.secver is None:
# First check if capabilities are supported or not
if not await has_capability(obj_transport):
print('Security capabilities could not be determined. Please specify "--sec_ver" explicitly')
print('---- Invalid Security Version ----')
exit(2)
# When no_sec is present, use security 0, else security 1
args.secver = int(not await has_capability(obj_transport, 'no_sec'))
print('Security scheme determined to be :', args.secver)
if (args.secver != 0) and not await has_capability(obj_transport, 'no_pop'):
if len(args.sec1_pop) == 0:
print('---- Proof of Possession argument not provided ----')
exit(2)
elif len(args.sec1_pop) != 0:
print('---- Proof of Possession will be ignored ----')
args.sec1_pop = ''
obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.sec1_pop, args.verbose)
if obj_security is None:
print('---- Invalid Security Version ----')
exit(2)
# When no_sec is present, use security 0, else security 1
args.secver = int(not has_capability(obj_transport, 'no_sec'))
print('Security scheme determined to be :', args.secver)
if args.version != '':
print('\n==== Verifying protocol version ====')
if not await version_match(obj_transport, args.version, args.verbose):
print('---- Error in protocol version matching ----')
exit(3)
print('==== Verified protocol version successfully ====')
if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'):
if len(args.sec1_pop) == 0:
print('---- Proof of Possession argument not provided ----')
exit(2)
elif len(args.sec1_pop) != 0:
print('---- Proof of Possession will be ignored ----')
args.sec1_pop = ''
print('\n==== Starting Session ====')
if not await establish_session(obj_transport, obj_security):
print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
print('---- Error in establishing session ----')
exit(4)
print('==== Session Established ====')
obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.sec1_pop, args.verbose)
if obj_security is None:
print('---- Invalid Security Version ----')
exit(2)
if args.custom_data != '':
print('\n==== Sending Custom data to esp32 ====')
if not await custom_data(obj_transport, obj_security, args.custom_data):
print('---- Error in custom data ----')
exit(5)
print('==== Custom data sent successfully ====')
if args.version != '':
print('\n==== Verifying protocol version ====')
if not version_match(obj_transport, args.version, args.verbose):
print('---- Error in protocol version matching ----')
exit(3)
print('==== Verified protocol version successfully ====')
print('\n==== Starting Session ====')
if not establish_session(obj_transport, obj_security):
print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
print('---- Error in establishing session ----')
exit(4)
print('==== Session Established ====')
if args.custom_data != '':
print('\n==== Sending Custom data to esp32 ====')
if not custom_data(obj_transport, obj_security, args.custom_data):
print('---- Error in custom data ----')
exit(5)
print('==== Custom data sent successfully ====')
if args.ssid == '':
if not has_capability(obj_transport, 'wifi_scan'):
print('---- Wi-Fi Scan List is not supported by provisioning service ----')
print('---- Rerun esp_prov with SSID and Passphrase as argument ----')
exit(3)
while True:
print('\n==== Scanning Wi-Fi APs ====')
start_time = time.time()
APs = scan_wifi_APs(args.mode.lower(), obj_transport, obj_security)
end_time = time.time()
print('\n++++ Scan finished in ' + str(end_time - start_time) + ' sec')
if APs is None:
print('---- Error in scanning Wi-Fi APs ----')
exit(8)
if len(APs) == 0:
print('No APs found!')
exit(9)
print('==== Wi-Fi Scan results ====')
print('{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
'S.N.', 'SSID', 'BSSID', 'CHN', 'RSSI', 'AUTH'))
for i in range(len(APs)):
print('[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
i + 1, APs[i]['ssid'], APs[i]['bssid'], APs[i]['channel'], APs[i]['rssi'], APs[i]['auth']))
if args.ssid == '':
if not await has_capability(obj_transport, 'wifi_scan'):
print('---- Wi-Fi Scan List is not supported by provisioning service ----')
print('---- Rerun esp_prov with SSID and Passphrase as argument ----')
exit(3)
while True:
try:
select = int(binput('Select AP by number (0 to rescan) : '))
if select < 0 or select > len(APs):
raise ValueError
print('\n==== Scanning Wi-Fi APs ====')
start_time = time.time()
APs = await scan_wifi_APs(args.mode.lower(), obj_transport, obj_security)
end_time = time.time()
print('\n++++ Scan finished in ' + str(end_time - start_time) + ' sec')
if APs is None:
print('---- Error in scanning Wi-Fi APs ----')
exit(8)
if len(APs) == 0:
print('No APs found!')
exit(9)
print('==== Wi-Fi Scan results ====')
print('{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
'S.N.', 'SSID', 'BSSID', 'CHN', 'RSSI', 'AUTH'))
for i in range(len(APs)):
print('[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
i + 1, APs[i]['ssid'], APs[i]['bssid'], APs[i]['channel'], APs[i]['rssi'], APs[i]['auth']))
while True:
try:
select = int(binput('Select AP by number (0 to rescan) : '))
if select < 0 or select > len(APs):
raise ValueError
break
except ValueError:
print('Invalid input! Retry')
if select != 0:
break
except ValueError:
print('Invalid input! Retry')
if select != 0:
break
args.ssid = APs[select - 1]['ssid']
prompt_str = 'Enter passphrase for {0} : '.format(args.ssid)
args.passphrase = getpass(prompt_str)
args.ssid = APs[select - 1]['ssid']
prompt_str = 'Enter passphrase for {0} : '.format(args.ssid)
args.passphrase = getpass(prompt_str)
print('\n==== Sending Wi-Fi credential to esp32 ====')
if not await send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
print('---- Error in send Wi-Fi config ----')
exit(6)
print('==== Wi-Fi Credentials sent successfully ====')
print('\n==== Sending Wi-Fi credential to esp32 ====')
if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
print('---- Error in send Wi-Fi config ----')
exit(6)
print('==== Wi-Fi Credentials sent successfully ====')
print('\n==== Applying config to esp32 ====')
if not await apply_wifi_config(obj_transport, obj_security):
print('---- Error in apply Wi-Fi config ----')
exit(7)
print('==== Apply config sent successfully ====')
print('\n==== Applying config to esp32 ====')
if not apply_wifi_config(obj_transport, obj_security):
print('---- Error in apply Wi-Fi config ----')
exit(7)
print('==== Apply config sent successfully ====')
await wait_wifi_connected(obj_transport, obj_security)
finally:
await obj_transport.disconnect()
wait_wifi_connected(obj_transport, obj_security)
if __name__ == '__main__':
asyncio.run(main())

View File

@ -19,7 +19,7 @@ def custom_data_request(security_ctx, data):
# Encrypt the custom data
enc_cmd = security_ctx.encrypt_data(tobytes(data))
print_verbose(security_ctx, 'Client -> Device (CustomData cmd) ' + utils.str_to_hexstr(enc_cmd))
return enc_cmd
return enc_cmd.decode('latin-1')
def custom_data_response(security_ctx, response_data):

View File

@ -1,3 +1,4 @@
bleak
future
cryptography
protobuf

View File

@ -1 +0,0 @@
dbus-python

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
#
from __future__ import print_function
@ -19,33 +8,26 @@ import platform
from builtins import input
import utils
from future.utils import iteritems
fallback = True
# Check if platform is Linux and required packages are installed
# Check if required packages are installed
# else fallback to console mode
if platform.system() == 'Linux':
try:
import time
import dbus
import dbus.mainloop.glib
fallback = False
except ImportError:
pass
try:
import bleak
fallback = False
except ImportError:
pass
# --------------------------------------------------------------------
# BLE client (Linux Only) using Bluez and DBus
class BLE_Bluez_Client:
class BLE_Bleak_Client:
def __init__(self):
self.adapter_props = None
def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
self.devname = devname
self.srv_uuid_fallback = fallback_srv_uuid
self.chrc_names = [name.lower() for name in chrc_names]
@ -56,117 +38,19 @@ class BLE_Bluez_Client:
self.characteristics = dict()
self.srv_uuid_adv = None
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects()
adapter_path = None
for path, interfaces in iteritems(objects):
adapter = interfaces.get('org.bluez.Adapter1')
if adapter is not None:
if path.endswith(iface):
self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1')
self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties')
adapter_path = path
break
if self.adapter is None:
raise RuntimeError('Bluetooth adapter not found')
# Power on bluetooth adapter
self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
print('checking if adapter is powered on')
for cnt in range(10, 0, -1):
time.sleep(5)
powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered')
if powered_on == 1:
# Set adapter props again with powered on value
self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties')
print('bluetooth adapter powered on')
break
print('number of retries left({})'.format(cnt - 1))
if powered_on == 0:
raise RuntimeError('Failed to starte bluetooth adapter')
# Start discovery if not already discovering
started_discovery = 0
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 0:
print('starting discovery')
self.adapter.StartDiscovery()
# Set as start discovery is called
started_discovery = 1
for cnt in range(10, 0, -1):
time.sleep(5)
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 1:
print('start discovery successful')
break
print('number of retries left ({})'.format(cnt - 1))
if discovery_val == 0:
print('start discovery failed')
raise RuntimeError('Failed to start discovery')
retry = 10
while (retry > 0):
try:
if self.device is None:
print('Connecting...')
# Wait for device to be discovered
time.sleep(5)
connected = self._connect_()
if connected:
print('Connected')
else:
return False
print('Getting Services...')
# Wait for services to be discovered
time.sleep(5)
self._get_services_()
return True
except Exception as e:
print(e)
retry -= 1
print('Retries left', retry)
continue
# Call StopDiscovery() for corresponding StartDiscovery() session
if started_discovery == 1:
print('stopping discovery')
self.adapter.StopDiscovery()
for cnt in range(10, 0, -1):
time.sleep(5)
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
if discovery_val == 0:
print('stop discovery successful')
break
print('number of retries left ({})'.format(cnt - 1))
if discovery_val == 1:
print('stop discovery failed')
return False
def _connect_(self):
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects()
dev_path = None
for path, interfaces in iteritems(objects):
if 'org.bluez.Device1' not in interfaces:
continue
if interfaces['org.bluez.Device1'].get('Name') == self.devname:
dev_path = path
break
if dev_path is None:
raise RuntimeError('BLE device not found')
print('Discovering...')
try:
self.device = bus.get_object('org.bluez', dev_path)
try:
uuids = self.device.Get('org.bluez.Device1', 'UUIDs',
dbus_interface='org.freedesktop.DBus.Properties')
devices = await bleak.discover()
except bleak.exc.BleakDBusError as e:
if str(e) == '[org.bluez.Error.NotReady] Resource Not Ready':
raise RuntimeError('Bluetooth is not ready. Maybe try `bluetoothctl power on`?')
raise
address = None
for d in devices:
if d.name == self.devname:
address = d.address
uuids = d.metadata['uuids']
# There should be 1 service UUID in advertising data
# If bluez had cached an old version of the advertisement data
# the list of uuids may be incorrect, in which case connection
@ -174,172 +58,78 @@ class BLE_Bluez_Client:
# the cache will be refreshed before next retry
if len(uuids) == 1:
self.srv_uuid_adv = uuids[0]
except dbus.exceptions.DBusException as e:
raise RuntimeError(e)
if not address:
raise RuntimeError('Device not found')
self.device.Connect(dbus_interface='org.bluez.Device1')
# Check device is connected successfully
for cnt in range(10, 0, -1):
time.sleep(5)
device_conn = self.device.Get(
'org.bluez.Device1',
'Connected',
dbus_interface='org.freedesktop.DBus.Properties')
if device_conn == 1:
print('device is connected')
break
print('number of retries left ({})'.format(cnt - 1))
if device_conn == 0:
print('failed to connect device')
return False
print('Connecting...')
self.device = bleak.BleakClient(address)
await self.device.connect()
# must be paired on Windows to access characteristics;
# cannot be paired on Mac
if platform.system() == 'Windows':
await self.device.pair()
return True
print('Getting Services...')
services = await self.device.get_services()
except Exception as e:
print(e)
service = services[self.srv_uuid_adv] or services[self.srv_uuid_fallback]
if not service:
await self.device.disconnect()
self.device = None
raise RuntimeError('BLE device could not connect')
raise RuntimeError('Provisioning service not found')
def _get_services_(self):
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = manager.GetManagedObjects()
service_found = False
for srv_path, srv_interfaces in iteritems(objects):
if 'org.bluez.GattService1' not in srv_interfaces:
continue
if not srv_path.startswith(self.device.object_path):
continue
service = bus.get_object('org.bluez', srv_path)
srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
# If service UUID doesn't match the one found in advertisement data
# then also check if it matches the fallback UUID
if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]:
continue
nu_lookup = dict()
characteristics = dict()
for chrc_path, chrc_interfaces in iteritems(objects):
if 'org.bluez.GattCharacteristic1' not in chrc_interfaces:
nu_lookup = dict()
for characteristic in service.characteristics:
for descriptor in characteristic.descriptors:
if descriptor.uuid[4:8] != '2901':
continue
if not chrc_path.startswith(service.object_path):
continue
chrc = bus.get_object('org.bluez', chrc_path)
uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
characteristics[uuid] = chrc
for desc_path, desc_interfaces in iteritems(objects):
if 'org.bluez.GattDescriptor1' not in desc_interfaces:
continue
if not desc_path.startswith(chrc.object_path):
continue
desc = bus.get_object('org.bluez', desc_path)
desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
if desc_uuid[4:8] != '2901':
continue
try:
readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
except dbus.exceptions.DBusException as err:
raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err))
found_name = ''.join(chr(b) for b in readval).lower()
nu_lookup[found_name] = uuid
break
readval = await self.device.read_gatt_descriptor(descriptor.handle)
found_name = ''.join(chr(b) for b in readval).lower()
nu_lookup[found_name] = characteristic.uuid
self.characteristics[characteristic.uuid] = characteristic
match_found = True
for name in self.chrc_names:
if name not in nu_lookup:
# Endpoint name not present
match_found = False
break
# Create lookup table only if all endpoint names found
self.nu_lookup = [None, nu_lookup][match_found]
self.characteristics = characteristics
service_found = True
# If the service UUID matches that in the advertisement
# we can stop the search now. If it doesn't match, we
# have found the service corresponding to the fallback
# UUID, in which case don't break and keep searching
# for the advertised service
if srv_uuid == self.srv_uuid_adv:
match_found = True
for name in self.chrc_names:
if name not in nu_lookup:
# Endpoint name not present
match_found = False
break
if not service_found:
self.device.Disconnect(dbus_interface='org.bluez.Device1')
# Check if device is disconnected successfully
self._check_device_disconnected()
if self.adapter:
self.adapter.RemoveDevice(self.device)
self.device = None
self.nu_lookup = None
self.characteristics = dict()
raise RuntimeError('Provisioning service not found')
# Create lookup table only if all endpoint names found
self.nu_lookup = [None, nu_lookup][match_found]
return True
def get_nu_lookup(self):
return self.nu_lookup
def has_characteristic(self, uuid):
print('checking for characteristic ' + uuid)
if uuid in self.characteristics:
return True
return False
def disconnect(self):
async def disconnect(self):
if self.device:
self.device.Disconnect(dbus_interface='org.bluez.Device1')
# Check if device is disconnected successfully
self._check_device_disconnected()
if self.adapter:
self.adapter.RemoveDevice(self.device)
print('Disconnecting...')
if platform.system() == 'Windows':
await self.device.unpair()
await self.device.disconnect()
self.device = None
self.nu_lookup = None
self.characteristics = dict()
if self.adapter_props:
self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0))
def _check_device_disconnected(self):
for cnt in range(10, 0, -1):
time.sleep(5)
device_conn = self.device.Get(
'org.bluez.Device1',
'Connected',
dbus_interface='org.freedesktop.DBus.Properties')
if device_conn == 0:
print('device disconnected')
break
print('number of retries left ({})'.format(cnt - 1))
if device_conn == 1:
print('failed to disconnect device')
def send_data(self, characteristic_uuid, data):
try:
path = self.characteristics[characteristic_uuid]
except KeyError:
raise RuntimeError('Invalid characteristic : ' + characteristic_uuid)
try:
path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
except TypeError: # python3 compatible
path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
except dbus.exceptions.DBusException as e:
raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e))
try:
readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1')
except dbus.exceptions.DBusException as e:
raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e))
async def send_data(self, characteristic_uuid, data):
await self.device.write_gatt_char(characteristic_uuid, bytearray(data.encode('latin-1')), True)
readval = await self.device.read_gatt_char(characteristic_uuid)
return ''.join(chr(b) for b in readval)
# --------------------------------------------------------------------
# Console based BLE client for Cross Platform support
class BLE_Console_Client:
def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
print('BLE client is running in console mode')
print('\tThis could be due to your platform not being supported or dependencies not being met')
print('\tPlease ensure all pre-requisites are met to run the full fledged client')
@ -362,10 +152,10 @@ class BLE_Console_Client:
return False
return True
def disconnect(self):
async def disconnect(self):
pass
def send_data(self, characteristic_uuid, data):
async def send_data(self, characteristic_uuid, data):
print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
print('\t>> ' + utils.str_to_hexstr(data))
print('BLECLI >> Enter data read from characteristic (in hex) :')
@ -380,4 +170,4 @@ class BLE_Console_Client:
def get_client():
if fallback:
return BLE_Console_Client()
return BLE_Bluez_Client()
return BLE_Bleak_Client()

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
#
# Base class for protocomm transport
@ -27,3 +16,6 @@ class Transport():
@abc.abstractmethod
def send_config_data(self, data):
pass
async def disconnect(self):
pass

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
#
from __future__ import print_function
@ -20,7 +9,9 @@ from .transport import Transport
class Transport_BLE(Transport):
def __init__(self, devname, service_uuid, nu_lookup):
def __init__(self, service_uuid, nu_lookup):
self.nu_lookup = nu_lookup
self.service_uuid = service_uuid
# Expect service UUID like '0000ffff-0000-1000-8000-00805f9b34fb'
for name in nu_lookup.keys():
# Calculate characteristic UUID for each endpoint
@ -30,10 +21,11 @@ class Transport_BLE(Transport):
# Get BLE client module
self.cli = ble_cli.get_client()
async def connect(self, devname):
# Use client to connect to BLE device and bind to service
if not self.cli.connect(devname=devname, iface='hci0',
chrc_names=nu_lookup.keys(),
fallback_srv_uuid=service_uuid):
if not await self.cli.connect(devname=devname, iface='hci0',
chrc_names=self.nu_lookup.keys(),
fallback_srv_uuid=self.service_uuid):
raise RuntimeError('Failed to initialize transport')
# Irrespective of provided parameters, let the client
@ -43,24 +35,17 @@ class Transport_BLE(Transport):
# If that doesn't work, use the lookup table provided as parameter
if self.name_uuid_lookup is None:
self.name_uuid_lookup = nu_lookup
self.name_uuid_lookup = self.nu_lookup
# Check if expected characteristics are provided by the service
for name in self.name_uuid_lookup.keys():
if not self.cli.has_characteristic(self.name_uuid_lookup[name]):
raise RuntimeError("'" + name + "' endpoint not found")
def __del__(self):
# Make sure device is disconnected before application gets closed
try:
self.disconnect()
except Exception:
pass
async def disconnect(self):
await self.cli.disconnect()
def disconnect(self):
self.cli.disconnect()
def send_data(self, ep_name, data):
async def send_data(self, ep_name, data):
# Write (and read) data to characteristic corresponding to the endpoint
if ep_name not in self.name_uuid_lookup.keys():
raise RuntimeError('Invalid endpoint : ' + ep_name)
return self.cli.send_data(self.name_uuid_lookup[ep_name], data)
return await self.cli.send_data(self.name_uuid_lookup[ep_name], data)

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
#
from __future__ import print_function
@ -24,7 +13,7 @@ from .transport import Transport
class Transport_Console(Transport):
def send_data(self, path, data, session_id=0):
async def send_data(self, path, data, session_id=0):
print('Client->Device msg :', path, session_id, utils.str_to_hexstr(data))
try:
resp = input('Enter device->client msg : ')

View File

@ -1,16 +1,5 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
#
from __future__ import print_function
@ -23,7 +12,7 @@ try:
from http.client import HTTPConnection, HTTPSConnection
except ImportError:
# Python 2 fallback
from httplib import HTTPConnection, HTTPSConnection
from httplib import HTTPConnection, HTTPSConnection # type: ignore
from .transport import Transport
@ -56,5 +45,5 @@ class Transport_HTTP(Transport):
raise RuntimeError('Connection Failure : ' + str(err))
raise RuntimeError('Server responded with error code ' + str(response.status))
def send_data(self, ep_name, data):
async def send_data(self, ep_name, data):
return self._send_post_request('/' + ep_name, data)