Merge branch 'feature/nimble_example_tests' into 'master'

Add NimBLE bleprph,blecent,blehr example tests

See merge request espressif/esp-idf!4420
This commit is contained in:
Angus Gratton 2019-07-05 08:14:05 +08:00
commit fda1887260
10 changed files with 1950 additions and 2 deletions

View File

@ -20,6 +20,12 @@ This example aims at understanding BLE service discovery, connection and charact
To test this demo, use any BLE GATT server app that advertises support for the Alert Notification service (0x1811) and includes it in the GATT database. To test this demo, use any BLE GATT server app that advertises support for the Alert Notification service (0x1811) and includes it in the GATT database.
A Python based utility `blecent_test.py` is also provided (which will run as a BLE GATT server) and can be used to test this example.
Note :
* Make sure to run `python -m pip install --user -r $IDF_PATH/requirements.txt -r $IDF_PATH/tools/ble/requirements.txt` to install the dependency packages needed.
* Currently this Python utility is only supported on Linux (BLE communication is via BLuez + DBus).
## How to use example ## How to use example
@ -45,7 +51,8 @@ See the Getting Started Guide for full steps to configure and use ESP-IDF to bui
## Example Output ## Example Output
There is this console output on successful connection: This is the console output on successful connection:
``` ```
I (202) BTDM_INIT: BT controller compile version [0b60040] I (202) BTDM_INIT: BT controller compile version [0b60040]
I (202) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE I (202) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE
@ -71,7 +78,8 @@ Write complete; status=0 conn_handle=0 attr_handle=47
Subscribe complete; status=0 conn_handle=0 attr_handle=43 Subscribe complete; status=0 conn_handle=0 attr_handle=43
``` ```
There is this console output on failure (or peripheral does not support New Alert Service category): This is the console output on failure (or peripheral does not support New Alert Service category):
``` ```
I (180) BTDM_INIT: BT controller compile version [8e87ec7] I (180) BTDM_INIT: BT controller compile version [8e87ec7]
I (180) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE I (180) system_api: Base MAC address is not set, read default base MAC address from BLK0 of EFUSE
@ -92,3 +100,60 @@ Error: Peer doesn't support the Supported New Alert Category characteristic
GAP procedure initiated: terminate connection; conn_handle=0 hci_reason=19 GAP procedure initiated: terminate connection; conn_handle=0 hci_reason=19
disconnect; reason=534 disconnect; reason=534
``` ```
## Running Python Utility
```
python blecent_test.py
```
## Python Utility Output
This is this output seen on the python side on successful connection:
```
discovering adapter...
bluetooth adapter discovered
powering on adapter...
bluetooth adapter powered on
Advertising started
GATT Data created
GATT Application registered
Advertising data created
Advertisement registered
Read Request received
SupportedNewAlertCategoryCharacteristic
Value: [dbus.Byte(2)]
Write Request received
AlertNotificationControlPointCharacteristic
Current value: [dbus.Byte(0)]
New value: [dbus.Byte(99), dbus.Byte(100)]
Notify Started
New value on write: [dbus.Byte(1), dbus.Byte(0)]
Value on read: [dbus.Byte(1), dbus.Byte(0)]
Notify Stopped
exiting from test...
GATT Data removed
GATT Application unregistered
Advertising data removed
Advertisement unregistered
Stop Advertising status: True
disconnecting device...
device disconnected
powering off adapter...
bluetooth adapter powered off
Service discovery passed
Service Discovery Status: 0
Read passed
SupportedNewAlertCategoryCharacteristic
Read Status: 0
Write passed
AlertNotificationControlPointCharacteristic
Write Status: 0
Subscribe passed
ClientCharacteristicConfigurationDescriptor
Subscribe Status: 0
```

View File

@ -0,0 +1,129 @@
#!/usr/bin/env python
#
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import re
import uuid
import subprocess
try:
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
except ImportError as e:
print(e)
print("\nCheck your IDF_PATH\nOR")
print("Try `export TEST_FW_PATH=$IDF_PATH/tools/tiny-test-fw` for resolving the issue\nOR")
print("Try `pip install -r $IDF_PATH/tools/tiny-test-fw/requirements.txt` for resolving the issue")
import IDF
try:
import lib_ble_client
except ImportError:
lib_ble_client_path = os.getenv("IDF_PATH") + "/tools/ble"
if lib_ble_client_path and lib_ble_client_path not in sys.path:
sys.path.insert(0, lib_ble_client_path)
import lib_ble_client
import Utility
# When running on local machine execute the following before running this script
# > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_example_app_ble_central(env, extra_data):
"""
Steps:
1. Discover Bluetooth Adapter and Power On
"""
interface = 'hci0'
adv_host_name = "BleCentTestApp"
adv_iface_index = 0
adv_type = 'peripheral'
adv_uuid = '1811'
# Acquire DUT
dut = env.get_dut("blecent", "examples/bluetooth/nimble/blecent")
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "blecent.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("blecent_bin_size", "{}KB".format(bin_size // 1024))
# Upload binary and start testing
Utility.console_log("Starting blecent example test app")
dut.start_app()
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
device_addr = ':'.join(re.findall('..', '%012x' % uuid.getnode()))
# Get BLE client module
ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface)
if not ble_client_obj:
raise RuntimeError("Get DBus-Bluez object failed !!")
# Discover Bluetooth Adapter and power on
is_adapter_set = ble_client_obj.set_adapter()
if not is_adapter_set:
raise RuntimeError("Adapter Power On failed !!")
# Write device address to dut
dut.expect("BLE Host Task Started", timeout=60)
dut.write(device_addr + "\n")
'''
Blecent application run:
Create GATT data
Register GATT Application
Create Advertising data
Register advertisement
Start advertising
'''
ble_client_obj.start_advertising(adv_host_name, adv_iface_index, adv_type, adv_uuid)
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
# Check dut responses
dut.expect("Connection established", timeout=30)
dut.expect("Service discovery complete; status=0", timeout=30)
print("Service discovery passed\n\tService Discovery Status: 0")
dut.expect("GATT procedure initiated: read;", timeout=30)
dut.expect("Read complete; status=0", timeout=30)
print("Read passed\n\tSupportedNewAlertCategoryCharacteristic\n\tRead Status: 0")
dut.expect("GATT procedure initiated: write;", timeout=30)
dut.expect("Write complete; status=0", timeout=30)
print("Write passed\n\tAlertNotificationControlPointCharacteristic\n\tWrite Status: 0")
dut.expect("GATT procedure initiated: write;", timeout=30)
dut.expect("Subscribe complete; status=0", timeout=30)
print("Subscribe passed\n\tClientCharacteristicConfigurationDescriptor\n\tSubscribe Status: 0")
if __name__ == '__main__':
test_example_app_ble_central()

View File

@ -10,6 +10,13 @@ This example aims at understanding notification subscriptions and sending notifi
To test this demo, any BLE scanner app can be used. To test this demo, any BLE scanner app can be used.
A Python based utility `blehr_test.py` is also provided (which will run as a BLE GATT Client) and can be used to test this example.
Note :
* Make sure to run `python -m pip install --user -r $IDF_PATH/requirements.txt -r $IDF_PATH/tools/ble/requirements.txt` to install the dependency packages needed.
* Currently this Python utility is only supported on Linux (BLE communication is via BLuez + DBus).
## How to use example ## How to use example
@ -59,3 +66,50 @@ GATT procedure initiated: notify; att_handle=3
``` ```
## Running Python Utility
```
python blehr_test.py
```
## Python Utility Output
This is this output seen on the python side on successful connection:
```
discovering adapter...
bluetooth adapter discovered
powering on adapter...
bluetooth adapter powered on
Started Discovery
Connecting to device...
Connected to device
Services
[dbus.String(u'00001801-0000-1000-8000-00805f9b34fb', variant_level=1), dbus.String(u'0000180d-0000-1000-8000-00805f9b34fb', variant_level=1), dbus.String(u'0000180a-0000-1000-8000-00805f9b34fb', variant_level=1)]
Subscribe to notifications: On
dbus.Array([dbus.Byte(6), dbus.Byte(90)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(91)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(92)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(93)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(94)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(95)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(96)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(97)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(98)], signature=dbus.Signature('y'), variant_level=1)
dbus.Array([dbus.Byte(6), dbus.Byte(99)], signature=dbus.Signature('y'), variant_level=1)
Subscribe to notifications: Off
Success: blehr example test passed
exiting from test...
disconnecting device...
device disconnected
powering off adapter...
bluetooth adapter powered off
```

View File

@ -0,0 +1,149 @@
#!/usr/bin/env python
#
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import re
from threading import Thread
import subprocess
try:
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
except ImportError as e:
print(e)
print("\nCheck your IDF_PATH\nOR")
print("Try `export TEST_FW_PATH=$IDF_PATH/tools/tiny-test-fw` for resolving the issue\nOR")
print("Try `pip install -r $IDF_PATH/tools/tiny-test-fw/requirements.txt` for resolving the issue\n")
import IDF
try:
import lib_ble_client
except ImportError:
lib_ble_client_path = os.getenv("IDF_PATH") + "/tools/ble"
if lib_ble_client_path and lib_ble_client_path not in sys.path:
sys.path.insert(0, lib_ble_client_path)
import lib_ble_client
import Utility
# When running on local machine execute the following before running this script
# > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
def blehr_client_task(dut_addr, dut):
interface = 'hci0'
ble_devname = 'blehr_sensor_1.0'
hr_srv_uuid = '180d'
hr_char_uuid = '2a37'
# Get BLE client module
ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr)
if not ble_client_obj:
raise RuntimeError("Failed to get DBus-Bluez object")
# Discover Bluetooth Adapter and power on
is_adapter_set = ble_client_obj.set_adapter()
if not is_adapter_set:
raise RuntimeError("Adapter Power On failed !!")
# Connect BLE Device
is_connected = ble_client_obj.connect()
if not is_connected:
Utility.console_log("Connection to device ", ble_devname, "failed !!")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
return
# Read Services
services_ret = ble_client_obj.get_services()
if services_ret:
print("\nServices\n")
print(services_ret)
else:
print("Failure: Read Services failed")
ble_client_obj.disconnect()
return
'''
Blehr application run:
Start Notifications
Retrieve updated value
Stop Notifications
'''
blehr_ret = ble_client_obj.hr_update_simulation(hr_srv_uuid, hr_char_uuid)
if blehr_ret:
print("Success: blehr example test passed")
else:
print("Failure: blehr example test failed")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_example_app_ble_hr(env, extra_data):
"""
Steps:
1. Discover Bluetooth Adapter and Power On
2. Connect BLE Device
3. Start Notifications
4. Updated value is retrieved
5. Stop Notifications
"""
try:
# Acquire DUT
dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr")
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "blehr.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("blehr_bin_size", bin_size // 1024)
# Upload binary and start testing
Utility.console_log("Starting blehr simple example test app")
dut.start_app()
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
# Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
# Starting a py-client in a separate thread
thread1 = Thread(target=blehr_client_task, args=(dut_addr,dut,))
thread1.start()
thread1.join()
# Check dut responses
dut.expect("subscribe event; cur_notify=1", timeout=30)
dut.expect("GATT procedure initiated: notify;", timeout=30)
dut.expect("subscribe event; cur_notify=0", timeout=30)
dut.expect("disconnect;", timeout=30)
except Exception as e:
sys.exit(e)
if __name__ == '__main__':
test_example_app_ble_hr()

View File

@ -12,6 +12,12 @@ It also demonstrates security features of NimBLE stack. SMP parameters like I/O
To test this demo, any BLE scanner app can be used. To test this demo, any BLE scanner app can be used.
A Python based utility `bleprph_test.py` is also provided (which will run as a BLE GATT Client) and can be used to test this example.
Note :
* Make sure to run `python -m pip install --user -r $IDF_PATH/requirements.txt -r $IDF_PATH/tools/ble/requirements.txt` to install the dependency packages needed.
* Currently this Python utility is only supported on Linux (BLE communication is via BLuez + DBus).
## How to use example ## How to use example
@ -73,3 +79,72 @@ peer_ota_addr_type=1 peer_ota_addr=xx:xx:xx:xx:xx:xx peer_id_addr_type=1 peer_id
``` ```
## Running Python Utility
```
python bleprph_test.py
```
## Python Utility Output
This is this output seen on the python side on successful connection:
```
discovering adapter...
bluetooth adapter discovered
powering on adapter...
bluetooth adapter powered on
Started Discovery
Connecting to device...
Connected to device
Services
[dbus.String(u'00001801-0000-1000-8000-00805f9b34fb', variant_level=1), dbus.String(u'59462f12-9543-9999-12c8-58b459a2712d', variant_level=1)]
Characteristics retrieved
Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000b
Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df6
Value: dbus.Array([dbus.Byte(45), dbus.Byte(244), dbus.Byte(81), dbus.Byte(88)], signature=dbus.Signature('y'))
Properties: : dbus.Array([dbus.String(u'read')], signature=dbus.Signature('s'), variant_level=1)
Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000d
Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df7
Value: dbus.Array([dbus.Byte(0)], signature=dbus.Signature('y'))
Properties: : dbus.Array([dbus.String(u'read'), dbus.String(u'write')], signature=dbus.Signature('s'), variant_level=1)
Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service0006/char0007
Characteristic UUID: 00002a05-0000-1000-8000-00805f9b34fb
Value: None
Properties: : dbus.Array([dbus.String(u'indicate')], signature=dbus.Signature('s'), variant_level=1)
Characteristics after write operation
Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000b
Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df6
Value: dbus.Array([dbus.Byte(45), dbus.Byte(244), dbus.Byte(81), dbus.Byte(88)], signature=dbus.Signature('y'))
Properties: : dbus.Array([dbus.String(u'read')], signature=dbus.Signature('s'), variant_level=1)
Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service000a/char000d
Characteristic UUID: 5c3a659e-897e-45e1-b016-007107c96df7
Value: dbus.Array([dbus.Byte(65)], signature=dbus.Signature('y'))
Properties: : dbus.Array([dbus.String(u'read'), dbus.String(u'write')], signature=dbus.Signature('s'), variant_level=1)
Characteristic: /org/bluez/hci0/dev_xx_xx_xx_xx_xx_xx/service0006/char0007
Characteristic UUID: 00002a05-0000-1000-8000-00805f9b34fb
Value: None
Properties: : dbus.Array([dbus.String(u'indicate')], signature=dbus.Signature('s'), variant_level=1)
exiting from test...
disconnecting device...
device disconnected
powering off adapter...
bluetooth adapter powered off
```
## Note
* NVS support is not yet integrated to bonding. So, for now, bonding is not persistent across reboot.

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python
#
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import re
from threading import Thread
import subprocess
try:
# This environment variable is expected on the host machine
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
except ImportError as e:
print(e)
print("Try `export TEST_FW_PATH=$IDF_PATH/tools/tiny-test-fw` for resolving the issue")
print("Try `pip install -r $IDF_PATH/tools/tiny-test-fw/requirements.txt` for resolving the issue")
import IDF
try:
import lib_ble_client
except ImportError:
lib_ble_client_path = os.getenv("IDF_PATH") + "/tools/ble"
if lib_ble_client_path and lib_ble_client_path not in sys.path:
sys.path.insert(0, lib_ble_client_path)
import lib_ble_client
import Utility
# When running on local machine execute the following before running this script
# > make app bootloader
# > make print_flash_cmd | tail -n 1 > build/download.config
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
def bleprph_client_task(dut_addr, dut):
interface = 'hci0'
ble_devname = 'nimble-bleprph'
srv_uuid = '2f12'
# Get BLE client module
ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr)
if not ble_client_obj:
raise RuntimeError("Failed to get DBus-Bluez object")
# Discover Bluetooth Adapter and power on
is_adapter_set = ble_client_obj.set_adapter()
if not is_adapter_set:
raise RuntimeError("Adapter Power On failed !!")
# Connect BLE Device
is_connected = ble_client_obj.connect()
if not is_connected:
Utility.console_log("Connection to device ", ble_devname, "failed !!")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
return
# Check dut responses
dut.expect("GAP procedure initiated: advertise;", timeout=30)
# Read Services
services_ret = ble_client_obj.get_services(srv_uuid)
if services_ret:
print("\nServices\n")
print(services_ret)
else:
print("Failure: Read Services failed")
ble_client_obj.disconnect()
return
# Read Characteristics
chars_ret = {}
chars_ret = ble_client_obj.read_chars()
if chars_ret:
Utility.console_log("\nCharacteristics retrieved")
for path, props in chars_ret.items():
print("\n\tCharacteristic: ", path)
print("\tCharacteristic UUID: ", props[2])
print("\tValue: ", props[0])
print("\tProperties: : ", props[1])
else:
print("Failure: Read Characteristics failed")
ble_client_obj.disconnect()
return
'''
Write Characteristics
- write 'A' to characteristic with write permission
'''
chars_ret_on_write = {}
chars_ret_on_write = ble_client_obj.write_chars('A')
if chars_ret_on_write:
Utility.console_log("\nCharacteristics after write operation")
for path, props in chars_ret_on_write.items():
print("\n\tCharacteristic:", path)
print("\tCharacteristic UUID: ", props[2])
print("\tValue:", props[0])
print("\tProperties: : ", props[1])
else:
print("Failure: Write Characteristics failed")
ble_client_obj.disconnect()
return
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_example_app_ble_peripheral(env, extra_data):
"""
Steps:
1. Discover Bluetooth Adapter and Power On
2. Connect BLE Device
3. Read Services
4. Read Characteristics
5. Write Characteristics
"""
try:
# Acquire DUT
dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph")
# Get binary file
binary_file = os.path.join(dut.app.binary_path, "bleprph.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("bleprph_bin_size", bin_size // 1024)
# Upload binary and start testing
Utility.console_log("Starting bleprph simple example test app")
dut.start_app()
subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
# Get device address from dut
dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
# Starting a py-client in a separate thread
thread1 = Thread(target=bleprph_client_task, args=(dut_addr,dut,))
thread1.start()
thread1.join()
# Check dut responses
dut.expect("connection established; status=0", timeout=30)
dut.expect("disconnect;", timeout=30)
except Exception as e:
sys.exit(e)
if __name__ == '__main__':
test_example_app_ble_peripheral()

805
tools/ble/lib_ble_client.py Normal file
View File

@ -0,0 +1,805 @@
#!/usr/bin/env python
#
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# DBus-Bluez BLE library
from __future__ import print_function
import sys
import time
try:
from future.moves.itertools import zip_longest
import dbus
import dbus.mainloop.glib
from gi.repository import GLib
except ImportError as e:
if 'linux' not in sys.platform:
sys.exit("Error: Only supported on Linux platform")
print(e)
print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue")
print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue")
raise
import lib_gatt
import lib_gap
srv_added_old_cnt = 0
srv_added_new_cnt = 0
blecent_retry_check_cnt = 0
verify_service_cnt = 0
verify_readchars_cnt = 0
blecent_adv_uuid = '1811'
iface_added = False
gatt_app_obj_check = False
gatt_app_reg_check = False
adv_data_check = False
adv_reg_check = False
read_req_check = False
write_req_check = False
subscribe_req_check = False
ble_hr_chrc = False
DISCOVERY_START = False
TEST_CHECKS_PASS = False
ADV_STOP = False
SERVICES_RESOLVED = False
SERVICE_UUID_FOUND = False
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
ADAPTER_IFACE = 'org.bluez.Adapter1'
DEVICE_IFACE = 'org.bluez.Device1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
ADAPTER_ON = False
DEVICE_CONNECTED = False
GATT_APP_REGISTERED = False
ADV_REGISTERED = False
ADV_ACTIVE_INSTANCE = False
CHRC_VALUE_CNT = False
# Set up the main loop.
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.mainloop.glib.threads_init()
# Set up the event main loop.
event_loop = GLib.MainLoop()
def set_props_status(props):
"""
Set Adapter status if it is powered on or off
"""
global ADAPTER_ON, SERVICES_RESOLVED, GATT_OBJ_REMOVED, GATT_APP_REGISTERED, \
ADV_REGISTERED, ADV_ACTIVE_INSTANCE, DEVICE_CONNECTED, CHRC_VALUE, CHRC_VALUE_CNT
is_service_uuid = False
# Signal caught for change in Adapter Powered property
if 'Powered' in props:
if props['Powered'] == 1:
ADAPTER_ON = True
else:
ADAPTER_ON = False
event_loop.quit()
elif 'ServicesResolved' in props:
if props['ServicesResolved'] == 1:
SERVICES_RESOLVED = True
else:
SERVICES_RESOLVED = False
elif 'UUIDs' in props:
# Signal caught for add/remove GATT data having service uuid
for uuid in props['UUIDs']:
if blecent_adv_uuid in uuid:
is_service_uuid = True
if not is_service_uuid:
# Signal caught for removing GATT data having service uuid
# and for unregistering GATT application
GATT_APP_REGISTERED = False
lib_gatt.GATT_APP_OBJ = False
elif 'ActiveInstances' in props:
# Signal caught for Advertising - add/remove Instances property
if props['ActiveInstances'] == 1:
ADV_ACTIVE_INSTANCE = True
elif props['ActiveInstances'] == 0:
ADV_ACTIVE_INSTANCE = False
ADV_REGISTERED = False
lib_gap.ADV_OBJ = False
elif 'Connected' in props:
# Signal caught for device connect/disconnect
if props['Connected'] is True:
DEVICE_CONNECTED = True
event_loop.quit()
else:
DEVICE_CONNECTED = False
elif 'Value' in props:
# Signal caught for change in chars value
if ble_hr_chrc:
CHRC_VALUE_CNT += 1
print(props['Value'])
if CHRC_VALUE_CNT == 10:
event_loop.quit()
def props_change_handler(iface, changed_props, invalidated):
"""
PropertiesChanged Signal handler.
Catch and print information about PropertiesChanged signal.
"""
if iface == ADAPTER_IFACE:
set_props_status(changed_props)
if iface == LE_ADVERTISING_MANAGER_IFACE:
set_props_status(changed_props)
if iface == DEVICE_IFACE:
set_props_status(changed_props)
if iface == GATT_CHRC_IFACE:
set_props_status(changed_props)
class BLE_Bluez_Client:
def __init__(self, iface, devname=None, devaddr=None):
self.bus = None
self.device = None
self.devname = devname
self.devaddr = devaddr
self.iface = iface
self.ble_objs = None
self.props_iface_obj = None
self.adapter_path = []
self.adapter = None
self.services = []
self.srv_uuid = []
self.chars = {}
self.char_uuid = []
try:
self.bus = dbus.SystemBus()
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
except Exception as e:
print(e)
def __del__(self):
try:
print("Test Exit")
except Exception as e:
print(e)
sys.exit(1)
def set_adapter(self):
'''
Discover Bluetooth Adapter
Power On Bluetooth Adapter
'''
try:
print("discovering adapter...")
for path, interfaces in self.ble_objs.items():
adapter = interfaces.get(ADAPTER_IFACE)
if adapter is not None:
if path.endswith(self.iface):
self.adapter = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE)
# Create Properties Interface object only after adapter is found
self.props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
self.adapter_path = [path, interfaces]
# Check adapter status - power on/off
set_props_status(interfaces[ADAPTER_IFACE])
break
if self.adapter is None:
raise RuntimeError("\nError: bluetooth adapter not found")
if self.props_iface_obj is None:
raise RuntimeError("\nError: properties interface not found")
print("bluetooth adapter discovered")
self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
# Check if adapter is already powered on
if ADAPTER_ON:
print("bluetooth adapter is already on")
return True
# Power On Adapter
print("powering on adapter...")
self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(1))
event_loop.run()
if ADAPTER_ON:
print("bluetooth adapter powered on")
return True
else:
print("Failure: bluetooth adapter not powered on")
return False
except Exception as e:
print(e)
sys.exit(1)
def connect(self):
'''
Connect to the device discovered
Retry 10 times to discover and connect to device
'''
global DISCOVERY_START
device_found = False
try:
self.adapter.StartDiscovery()
print("\nStarted Discovery")
DISCOVERY_START = True
for retry_cnt in range(10,0,-1):
try:
if self.device is None:
print("\nConnecting to device...")
# Wait for device to be discovered
time.sleep(5)
device_found = self.get_device()
if device_found:
self.device.Connect(dbus_interface=DEVICE_IFACE)
event_loop.quit()
print("\nConnected to device")
return True
except Exception as e:
print(e)
print("\nRetries left", retry_cnt - 1)
continue
# Device not found
return False
except Exception as e:
print(e)
self.device = None
return False
def get_device(self):
'''
Discover device based on device name
and device address and connect
'''
dev_path = None
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
for path, interfaces in self.ble_objs.items():
if DEVICE_IFACE not in interfaces.keys():
continue
device_addr_iface = (path.replace('_', ':')).lower()
dev_addr = self.devaddr.lower()
if dev_addr in device_addr_iface and \
interfaces[DEVICE_IFACE].get("Name") == self.devname:
dev_path = path
break
if dev_path is None:
print("\nBLE device not found")
return False
device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path)
return True
def srvc_iface_added_handler(self, path, interfaces):
'''
Add services found as lib_ble_client obj
'''
if self.device and path.startswith(self.device.object_path):
if GATT_SERVICE_IFACE in interfaces.keys():
service = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
uuid = service.Get(GATT_SERVICE_IFACE, 'UUID', dbus_interface=DBUS_PROP_IFACE)
if uuid not in self.srv_uuid:
self.srv_uuid.append(uuid)
if path not in self.services:
self.services.append(path)
def verify_get_services(self):
global SERVICE_SCAN_FAIL, verify_service_cnt
verify_service_cnt += 1
if iface_added and self.services and SERVICES_RESOLVED:
event_loop.quit()
if verify_service_cnt == 10:
event_loop.quit()
def verify_service_uuid_found(self):
'''
Verify service uuid found
'''
global SERVICE_UUID_FOUND
srv_uuid_found = [uuid for uuid in self.srv_uuid if service_uuid in uuid]
if srv_uuid_found:
SERVICE_UUID_FOUND = True
def get_services(self, srv_uuid=None):
'''
Retrieve Services found in the device connected
'''
global service_uuid, iface_added, SERVICE_UUID_FOUND
service_uuid = srv_uuid
iface_added = False
SERVICE_UUID_FOUND = False
try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
for path, interfaces in self.ble_objs.items():
self.srvc_iface_added_handler(path, interfaces)
# If services not found, then they may not have been added yet on dbus
if not self.services:
iface_added = True
GLib.timeout_add_seconds(2, self.verify_get_services)
om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler)
event_loop.run()
if service_uuid:
self.verify_service_uuid_found()
if not SERVICE_UUID_FOUND:
raise Exception("Service with uuid: %s not found !!!" % service_uuid)
return self.srv_uuid
except Exception as e:
print("Error: ", e)
return False
def chrc_iface_added_handler(self, path, interfaces):
'''
Add services found as lib_ble_client obj
'''
global chrc, chrc_discovered
chrc_val = None
if self.device and path.startswith(self.device.object_path):
if GATT_CHRC_IFACE in interfaces.keys():
chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
dbus_interface=DBUS_PROP_IFACE)
chrc_flags = chrc_props['Flags']
if 'read' in chrc_flags:
chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
uuid = chrc_props['UUID']
self.chars[path] = chrc_val, chrc_flags, uuid
def verify_get_chars(self):
global verify_readchars_cnt
verify_readchars_cnt += 1
if iface_added and self.chars:
event_loop.quit()
if verify_readchars_cnt == 10:
event_loop.quit()
def read_chars(self):
'''
Read characteristics found in the device connected
'''
global iface_added, chrc_discovered
chrc_discovered = False
iface_added = False
try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
for path, interfaces in self.ble_objs.items():
self.chrc_iface_added_handler(path, interfaces)
# If chars not found, then they have not been added yet to interface
if not self.chars:
iface_added = True
GLib.timeout_add_seconds(2, self.verify_get_chars)
om_iface_obj.connect_to_signal('InterfacesAdded', self.chars_iface_added_handler)
event_loop.run()
return self.chars
except Exception as e:
print("Error: ", e)
return False
def write_chars(self, write_val):
'''
Write characteristics to the device connected
'''
chrc = None
chrc_val = None
char_write_props = False
try:
for path, props in self.chars.items():
if 'write' in props[1]: # check permission
char_write_props = True
chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
chrc.WriteValue(write_val,{},dbus_interface=GATT_CHRC_IFACE)
if 'read' in props[1]:
chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
else:
print("Warning: Cannot read value. Characteristic does not have read permission.")
if not (ord(write_val) == int(chrc_val[0])):
print("\nWrite Failed")
return False
self.chars[path] = chrc_val, props[1], props[2] # update value
if not char_write_props:
print("Failure: Cannot perform write operation. Characteristic does not have write permission.")
return False
return self.chars
except Exception as e:
print(e)
return False
def hr_update_simulation(self, hr_srv_uuid, hr_char_uuid):
'''
Start Notifications
Retrieve updated value
Stop Notifications
'''
global ble_hr_chrc
srv_path = None
chrc = None
uuid = None
chrc_path = None
chars_ret = None
ble_hr_chrc = True
try:
# Get HR Measurement characteristic
services = list(zip_longest(self.srv_uuid, self.services))
for uuid, path in services:
if hr_srv_uuid in uuid:
srv_path = path
break
if srv_path is None:
print("Failure: HR UUID:", hr_srv_uuid, "not found")
return False
chars_ret = self.read_chars()
for path, props in chars_ret.items():
if path.startswith(srv_path):
chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
chrc_path = path
if hr_char_uuid in props[2]: # uuid
break
if chrc is None:
print("Failure: Characteristics for service: ", srv_path, "not found")
return False
# Subscribe to notifications
print("\nSubscribe to notifications: On")
chrc.StartNotify(dbus_interface=GATT_CHRC_IFACE)
chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE)
chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
event_loop.run()
chrc.StopNotify(dbus_interface=GATT_CHRC_IFACE)
time.sleep(2)
print("\nSubscribe to notifications: Off")
ble_hr_chrc = False
return True
except Exception as e:
print(e)
return False
def create_gatt_app(self):
'''
Create GATT data
Register GATT Application
'''
global gatt_app_obj, gatt_manager_iface_obj
gatt_app_obj = None
gatt_manager_iface_obj = None
try:
gatt_app_obj = lib_gatt.Application(self.bus, self.adapter_path[0])
gatt_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME,self.adapter_path[0]), GATT_MANAGER_IFACE)
gatt_manager_iface_obj.RegisterApplication(gatt_app_obj, {},
reply_handler=self.gatt_app_handler,
error_handler=self.gatt_app_error_handler)
return True
except Exception as e:
print(e)
return False
def gatt_app_handler(self):
'''
GATT Application Register success handler
'''
global GATT_APP_REGISTERED
GATT_APP_REGISTERED = True
def gatt_app_error_handler(self, error):
'''
GATT Application Register error handler
'''
global GATT_APP_REGISTERED
GATT_APP_REGISTERED = False
def start_advertising(self, adv_host_name, adv_iface_index, adv_type, adv_uuid):
'''
Create Advertising data
Register Advertisement
Start Advertising
'''
global le_adv_obj, le_adv_manager_iface_obj
le_adv_obj = None
le_adv_manager_iface_obj = None
le_adv_iface_path = None
try:
print("Advertising started")
gatt_app_ret = self.create_gatt_app()
if not gatt_app_ret:
return False
for path,interface in self.ble_objs.items():
if LE_ADVERTISING_MANAGER_IFACE in interface:
le_adv_iface_path = path
if le_adv_iface_path is None:
print('\n Cannot start advertising. LEAdvertisingManager1 Interface not found')
return False
le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name)
le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE)
le_adv_manager_iface_obj.RegisterAdvertisement(le_adv_obj.get_path(), {},
reply_handler=self.adv_handler,
error_handler=self.adv_error_handler)
GLib.timeout_add_seconds(2, self.verify_blecent)
event_loop.run()
if TEST_CHECKS_PASS:
return True
else:
return False
except Exception as e:
print("in Exception")
print(e)
return False
def adv_handler(self):
'''
Advertisement Register success handler
'''
global ADV_REGISTERED
ADV_REGISTERED = True
def adv_error_handler(self, error):
'''
Advertisement Register error handler
'''
global ADV_REGISTERED
ADV_REGISTERED = False
def verify_blecent(self):
"""
Verify blecent test commands are successful
"""
global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\
adv_data_check, adv_reg_check, read_req_check, write_req_check,\
subscribe_req_check, TEST_CHECKS_PASS
# Get device when connected
if not self.device:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
for path, interfaces in self.ble_objs.items():
if DEVICE_IFACE not in interfaces.keys():
continue
device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
# Check for failures after 10 retries
if blecent_retry_check_cnt == 10:
# check for failures
if not gatt_app_obj_check:
print("Failure: GATT Data could not be created")
if not gatt_app_reg_check:
print("Failure: GATT Application could not be registered")
if not adv_data_check:
print("Failure: Advertising data could not be created")
if not adv_reg_check:
print("Failure: Advertisement could not be registered")
if not read_req_check:
print("Failure: Read Request not received")
if not write_req_check:
print("Failure: Write Request not received")
if not subscribe_req_check:
print("Failure: Subscribe Request not received")
# Blecent Test failed
TEST_CHECKS_PASS = False
if subscribe_req_check:
lib_gatt.alert_status_char_obj.StopNotify()
event_loop.quit()
return False # polling for checks will stop
# Check for success
if not gatt_app_obj_check and lib_gatt.GATT_APP_OBJ:
print("GATT Data created")
gatt_app_obj_check = True
if not gatt_app_reg_check and GATT_APP_REGISTERED:
print("GATT Application registered")
gatt_app_reg_check = True
if not adv_data_check and lib_gap.ADV_OBJ:
print("Advertising data created")
adv_data_check = True
if not adv_reg_check and ADV_REGISTERED and ADV_ACTIVE_INSTANCE:
print("Advertisement registered")
adv_reg_check = True
if not read_req_check and lib_gatt.CHAR_READ:
read_req_check = True
if not write_req_check and lib_gatt.CHAR_WRITE:
write_req_check = True
if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE:
subscribe_req_check = True
# Increment retry count
blecent_retry_check_cnt += 1
if read_req_check and write_req_check and subscribe_req_check:
# all checks passed
# Blecent Test passed
TEST_CHECKS_PASS = True
lib_gatt.alert_status_char_obj.StopNotify()
event_loop.quit()
return False # polling for checks will stop
# Default return True - polling for checks will continue
return True
def verify_blecent_disconnect(self):
"""
Verify cleanup is successful
"""
global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\
adv_data_check, adv_reg_check, ADV_STOP
if blecent_retry_check_cnt == 0:
gatt_app_obj_check = False
gatt_app_reg_check = False
adv_data_check = False
adv_reg_check = False
# Check for failures after 10 retries
if blecent_retry_check_cnt == 10:
# check for failures
if not gatt_app_obj_check:
print("Warning: GATT Data could not be removed")
if not gatt_app_reg_check:
print("Warning: GATT Application could not be unregistered")
if not adv_data_check:
print("Warning: Advertising data could not be removed")
if not adv_reg_check:
print("Warning: Advertisement could not be unregistered")
# Blecent Test failed
ADV_STOP = False
event_loop.quit()
return False # polling for checks will stop
# Check for success
if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ:
print("GATT Data removed")
gatt_app_obj_check = True
if not gatt_app_reg_check and not GATT_APP_REGISTERED:
print("GATT Application unregistered")
gatt_app_reg_check = True
if not adv_data_check and not adv_reg_check and not (ADV_REGISTERED or ADV_ACTIVE_INSTANCE or lib_gap.ADV_OBJ):
print("Advertising data removed")
print("Advertisement unregistered")
adv_data_check = True
adv_reg_check = True
# Increment retry count
blecent_retry_check_cnt += 1
if adv_reg_check:
# all checks passed
ADV_STOP = True
event_loop.quit()
return False # polling for checks will stop
# Default return True - polling for checks will continue
return True
def disconnect(self):
'''
Before application exits:
Advertisement is unregistered
Advertisement object created is removed from dbus
GATT Application is unregistered
GATT Application object created is removed from dbus
Disconnect device if connected
Adapter is powered off
'''
try:
global blecent_retry_check_cnt, DISCOVERY_START
blecent_retry_check_cnt = 0
print("\nexiting from test...")
if ADV_REGISTERED:
# Unregister Advertisement
le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path())
# Remove Advertising data
dbus.service.Object.remove_from_connection(le_adv_obj)
if GATT_APP_REGISTERED:
# Unregister GATT Application
gatt_manager_iface_obj.UnregisterApplication(gatt_app_obj.get_path())
# Remove GATT data
dbus.service.Object.remove_from_connection(gatt_app_obj)
GLib.timeout_add_seconds(2, self.verify_blecent_disconnect)
event_loop.run()
if ADV_STOP:
print("Stop Advertising status: ", ADV_STOP)
else:
print("Warning: Stop Advertising status: ", ADV_STOP)
# Disconnect device
if self.device:
print("disconnecting device...")
self.device.Disconnect(dbus_interface=DEVICE_IFACE)
if self.adapter:
self.adapter.RemoveDevice(self.device)
self.device = None
if DISCOVERY_START:
self.adapter.StopDiscovery()
DISCOVERY_START = False
# Power off Adapter
self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(0))
event_loop.run()
if not DEVICE_CONNECTED:
print("device disconnected")
else:
print("Warning: device could not be disconnected")
print("powering off adapter...")
if not ADAPTER_ON:
print("bluetooth adapter powered off")
else:
print("\nWarning: Bluetooth adapter not powered off")
except Exception as e:
print(e)
return False

87
tools/ble/lib_gap.py Normal file
View File

@ -0,0 +1,87 @@
#!/usr/bin/env python
#
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Register Advertisement
from __future__ import print_function
import sys
try:
import dbus
import dbus.service
except ImportError as e:
if 'linux' not in sys.platform:
sys.exit("Error: Only supported on Linux platform")
print(e)
print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue")
print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue")
raise
ADV_OBJ = False
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class Advertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/hci0/advertisement'
def __init__(self, bus, index, advertising_type, uuid, name):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = [uuid]
self.local_name = dbus.String(name)
dbus.service.Object.__init__(self, bus, self.path)
def __del__(self):
pass
def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.local_name is not None:
properties['LocalName'] = dbus.String(self.local_name)
return {LE_ADVERTISEMENT_IFACE: properties}
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
global ADV_OBJ
if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
ADV_OBJ = True
return self.get_properties()[LE_ADVERTISEMENT_IFACE]
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
pass

412
tools/ble/lib_gatt.py Normal file
View File

@ -0,0 +1,412 @@
#!/usr/bin/env python
#
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Creating GATT Application which then becomes available to remote devices.
from __future__ import print_function
import sys
try:
import dbus
import dbus.service
except ImportError as e:
if 'linux' not in sys.platform:
sys.exit("Error: Only supported on Linux platform")
print(e)
print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue")
print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue")
raise
alert_status_char_obj = None
GATT_APP_OBJ = False
CHAR_READ = False
CHAR_WRITE = False
CHAR_SUBSCRIBE = False
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class Application(dbus.service.Object):
"""
org.bluez.GattApplication1 interface implementation
"""
def __init__(self, bus, path):
self.path = path
self.services = []
srv_obj = AlertNotificationService(bus, '0001')
self.add_service(srv_obj)
dbus.service.Object.__init__(self, bus, self.path)
def __del__(self):
pass
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
global GATT_APP_OBJ
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
GATT_APP_OBJ = True
return response
def Release(self):
pass
class Service(dbus.service.Object):
"""
org.bluez.GattService1 interface implementation
"""
PATH_BASE = '/org/bluez/hci0/service'
def __init__(self, bus, index, uuid, primary=False):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self, characteristic):
self.characteristics.append(characteristic)
def get_characteristic_paths(self):
result = []
for chrc in self.characteristics:
result.append(chrc.get_path())
return result
def get_characteristics(self):
return self.characteristics
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_SERVICE_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_SERVICE_IFACE]
class Characteristic(dbus.service.Object):
"""
org.bluez.GattCharacteristic1 interface implementation
"""
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path + '/char' + str(index)
self.bus = bus
self.uuid = uuid
self.service = service
self.flags = flags
self.value = [0]
self.descriptors = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Value': self.value,
'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_descriptor(self, descriptor):
self.descriptors.append(descriptor)
def get_descriptor_paths(self):
result = []
for desc in self.descriptors:
result.append(desc.get_path())
return result
def get_descriptors(self):
return self.descriptors
@dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_CHRC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
def ReadValue(self, options):
print('\nDefault ReadValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('\nDefault WriteValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StartNotify(self):
print('Default StartNotify called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StopNotify(self):
print('Default StopNotify called, returning error')
raise NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
print("\nProperties Changed")
class Descriptor(dbus.service.Object):
"""
org.bluez.GattDescriptor1 interface implementation
"""
def __init__(self, bus, index, uuid, flags, characteristic):
self.path = characteristic.path + '/desc' + str(index)
self.bus = bus
self.uuid = uuid
self.flags = flags
self.chrc = characteristic
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_DESC_IFACE: {
'Characteristic': self.chrc.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_DESC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_DESC_IFACE]
@dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
def ReadValue(self, options):
print('Default ReadValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('Default WriteValue called, returning error')
raise NotSupportedException()
class AlertNotificationService(Service):
TEST_SVC_UUID = '00001811-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index):
global alert_status_char_obj
Service.__init__(self, bus, index, self.TEST_SVC_UUID, primary=True)
self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
alert_status_char_obj = UnreadAlertStatusCharacteristic(bus, '0003', self)
self.add_characteristic(alert_status_char_obj)
class SupportedNewAlertCategoryCharacteristic(Characteristic):
SUPPORT_NEW_ALERT_UUID = '00002A47-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.SUPPORT_NEW_ALERT_UUID,
['read'],
service)
self.value = [dbus.Byte(2)]
def ReadValue(self, options):
global CHAR_READ
CHAR_READ = True
val_list = []
for val in self.value:
val_list.append(dbus.Byte(val))
print("Read Request received\n", "\tSupportedNewAlertCategoryCharacteristic")
print("\tValue:", "\t", val_list)
return val_list
class AlertNotificationControlPointCharacteristic(Characteristic):
ALERT_NOTIF_UUID = '00002A44-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.ALERT_NOTIF_UUID,
['read','write'],
service)
self.value = [dbus.Byte(0)]
def ReadValue(self, options):
val_list = []
for val in self.value:
val_list.append(dbus.Byte(val))
print("Read Request received\n", "\tAlertNotificationControlPointCharacteristic")
print("\tValue:", "\t", val_list)
return val_list
def WriteValue(self, value, options):
global CHAR_WRITE
CHAR_WRITE = True
print("Write Request received\n", "\tAlertNotificationControlPointCharacteristic")
print("\tCurrent value:", "\t", self.value)
val_list = []
for val in value:
val_list.append(val)
self.value = val_list
# Check if new value is written
print("\tNew value:", "\t", self.value)
if not self.value == value:
print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
class UnreadAlertStatusCharacteristic(Characteristic):
UNREAD_ALERT_STATUS_UUID = '00002A45-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.UNREAD_ALERT_STATUS_UUID,
['read', 'write', 'notify'],
service)
self.value = [dbus.Byte(0)]
self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
self.add_descriptor(self.cccd_obj)
self.notifying = False
def StartNotify(self):
global CHAR_SUBSCRIBE
CHAR_SUBSCRIBE = True
if self.notifying:
print('\nAlready notifying, nothing to do')
return
self.notifying = True
print("\nNotify Started")
self.cccd_obj.WriteValue([dbus.Byte(1), dbus.Byte(0)])
self.cccd_obj.ReadValue()
def StopNotify(self):
if not self.notifying:
print('\nNot notifying, nothing to do')
return
self.notifying = False
print("\nNotify Stopped")
def ReadValue(self, options):
print("Read Request received\n", "\tUnreadAlertStatusCharacteristic")
val_list = []
for val in self.value:
val_list.append(dbus.Byte(val))
print("\tValue:", "\t", val_list)
return val_list
def WriteValue(self, value, options):
print("Write Request received\n", "\tUnreadAlertStatusCharacteristic")
val_list = []
for val in value:
val_list.append(val)
self.value = val_list
# Check if new value is written
print("\tNew value:", "\t", self.value)
if not self.value == value:
print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
class ClientCharacteristicConfigurationDescriptor(Descriptor):
CCCD_UUID = '00002902-0000-1000-8000-00805f9b34fb'
def __init__(self, bus, index, characteristic):
self.value = [dbus.Byte(0)]
Descriptor.__init__(
self, bus, index,
self.CCCD_UUID,
['read', 'write'],
characteristic)
def ReadValue(self):
print("\tValue on read:", "\t", self.value)
return self.value
def WriteValue(self, value):
val_list = []
for val in value:
val_list.append(val)
self.value = val_list
# Check if new value is written
print("New value on write:", "\t", self.value)
if not self.value == value:
print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)

View File

@ -0,0 +1,3 @@
future
dbus-python
pygobject