mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
713 lines
28 KiB
Python
713 lines
28 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
# DBus-Bluez BLE library
|
|
|
|
from __future__ import print_function
|
|
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import dbus
|
|
import dbus.mainloop.glib
|
|
from gi.repository import GLib
|
|
except ImportError as e:
|
|
if 'linux' not in sys.platform:
|
|
raise e
|
|
print(e)
|
|
print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
|
|
print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
|
|
raise
|
|
|
|
from . import lib_gap, lib_gatt
|
|
|
|
BLUEZ_SERVICE_NAME = 'org.bluez'
|
|
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
|
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
|
|
|
ADAPTER_IFACE = 'org.bluez.Adapter1'
|
|
DEVICE_IFACE = 'org.bluez.Device1'
|
|
|
|
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
|
|
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
|
|
|
|
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
|
|
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
|
|
|
|
|
|
class DBusException(dbus.exceptions.DBusException):
|
|
pass
|
|
|
|
|
|
class Characteristic:
|
|
def __init__(self):
|
|
self.iface = None
|
|
self.path = None
|
|
self.props = None
|
|
|
|
|
|
class Service:
|
|
def __init__(self):
|
|
self.iface = None
|
|
self.path = None
|
|
self.props = None
|
|
self.chars = []
|
|
|
|
|
|
class Device:
|
|
def __init__(self):
|
|
self.iface = None
|
|
self.path = None
|
|
self.props = None
|
|
self.name = None
|
|
self.addr = None
|
|
self.services = []
|
|
|
|
|
|
class Adapter:
|
|
def __init__(self):
|
|
self.iface = None
|
|
self.path = None
|
|
self.props = None
|
|
|
|
|
|
class BLE_Bluez_Client:
|
|
def __init__(self, iface=None):
|
|
self.bus = None
|
|
self.hci_iface = iface
|
|
self.adapter = Adapter()
|
|
self.device = None
|
|
self.gatt_app = None
|
|
self.gatt_mgr = None
|
|
self.mainloop = None
|
|
self.loop_cnt = 0
|
|
|
|
try:
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
self.bus = dbus.SystemBus()
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
raise DBusException('Failed to initialise client: {}'.format(dbus_err))
|
|
except Exception as err:
|
|
raise Exception('Failed to initialise client: {}'.format(err))
|
|
|
|
def __del__(self):
|
|
try:
|
|
# Cleanup
|
|
self.disconnect()
|
|
print('Test Exit')
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
def set_adapter(self):
|
|
'''
|
|
Discover Bluetooth Adapter
|
|
Power On Bluetooth Adapter
|
|
'''
|
|
try:
|
|
print('discovering adapter')
|
|
dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
for path, interfaces in dbus_objs.items():
|
|
adapter = interfaces.get(ADAPTER_IFACE)
|
|
if adapter is not None and path.endswith(self.hci_iface):
|
|
self.adapter.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE)
|
|
self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
|
|
self.adapter.path = path
|
|
break
|
|
|
|
if self.adapter.iface is None:
|
|
print('bluetooth adapter not found')
|
|
return False
|
|
|
|
print('bluetooth adapter discovered')
|
|
print('checking if bluetooth adapter is already powered on')
|
|
# Check if adapter is already powered on
|
|
powered = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
|
|
if powered == 1:
|
|
print('adapter already powered on')
|
|
return True
|
|
# Power On Adapter
|
|
print('powering on adapter')
|
|
self.adapter.props.Set(ADAPTER_IFACE, 'Powered', dbus.Boolean(1))
|
|
# Check if adapter is powered on
|
|
print('checking if adapter is powered on')
|
|
for cnt in range(10, 0, -1):
|
|
time.sleep(5)
|
|
powered_on = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
|
|
if powered_on == 1:
|
|
# Set adapter props again with powered on value
|
|
self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), DBUS_PROP_IFACE)
|
|
print('bluetooth adapter powered on')
|
|
return True
|
|
print('number of retries left({})'.format(cnt - 1))
|
|
|
|
# Adapter not powered on
|
|
print('bluetooth adapter not powered on')
|
|
return False
|
|
|
|
except Exception as err:
|
|
raise Exception('Failed to set adapter: {}'.format(err))
|
|
|
|
def connect(self, devname=None, devaddr=None):
|
|
'''
|
|
Start Discovery and Connect to the device
|
|
'''
|
|
try:
|
|
device_found = None
|
|
start_discovery = False
|
|
self.device = Device()
|
|
|
|
discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
|
|
# Start Discovery
|
|
if discovery_val == 0:
|
|
print('starting discovery')
|
|
self.adapter.iface.StartDiscovery()
|
|
start_discovery = True
|
|
|
|
for cnt in range(10, 0, -1):
|
|
time.sleep(5)
|
|
discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
|
|
if discovery_val == 1:
|
|
print('start discovery successful')
|
|
break
|
|
print('number of retries left ({})'.format(cnt - 1))
|
|
|
|
if discovery_val == 0:
|
|
print('start discovery failed')
|
|
return False
|
|
|
|
# Get device
|
|
for cnt in range(10, 0, -1):
|
|
# Wait for device to be discovered
|
|
time.sleep(5)
|
|
device_found = self.get_device(
|
|
devname=devname,
|
|
devaddr=devaddr)
|
|
if device_found:
|
|
break
|
|
# Retry
|
|
print('number of retries left ({})'.format(cnt - 1))
|
|
|
|
if not device_found:
|
|
print('expected device {} [ {} ] not found'.format(devname, devaddr))
|
|
return False
|
|
|
|
# Connect to expected device found
|
|
print('connecting to device {} [ {} ] '.format(self.device.name, self.device.addr))
|
|
self.device.iface.Connect(dbus_interface=DEVICE_IFACE)
|
|
for cnt in range(10, 0, -1):
|
|
time.sleep(5)
|
|
connected = self.device.props.Get(DEVICE_IFACE, 'Connected')
|
|
if connected == 1:
|
|
# Set device props again with connected on value
|
|
self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.device.path), DBUS_PROP_IFACE)
|
|
print('connected to device with iface {}'.format(self.device.path))
|
|
return True
|
|
print('number of retries left({})'.format(cnt - 1))
|
|
|
|
# Device not connected
|
|
print('connection to device failed')
|
|
return False
|
|
|
|
except Exception as err:
|
|
raise Exception('Connect to device failed : {}'.format(err))
|
|
finally:
|
|
try:
|
|
if start_discovery:
|
|
print('stopping discovery')
|
|
self.adapter.iface.StopDiscovery()
|
|
for cnt in range(10, 0, -1):
|
|
time.sleep(5)
|
|
discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
|
|
if discovery_val == 0:
|
|
print('stop discovery successful')
|
|
break
|
|
print('number of retries left ({})'.format(cnt - 1))
|
|
if discovery_val == 1:
|
|
print('stop discovery failed')
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
print('Warning: Failure during cleanup for device connection : {}'.format(dbus_err))
|
|
|
|
def get_device(self, devname=None, devaddr=None):
|
|
'''
|
|
Get device based on device name
|
|
and device address and connect to device
|
|
'''
|
|
dev_path = None
|
|
expected_device_addr = devaddr.lower()
|
|
expected_device_name = devname.lower()
|
|
|
|
print('checking if expected device {} [ {} ] is present'.format(devname, devaddr))
|
|
|
|
dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
|
|
# Check if expected device is present
|
|
for path, interfaces in dbus_objs.items():
|
|
if DEVICE_IFACE not in interfaces.keys():
|
|
continue
|
|
|
|
# Check expected device address is received device address
|
|
received_device_addr_path = (path.replace('_', ':')).lower()
|
|
if expected_device_addr not in received_device_addr_path:
|
|
continue
|
|
|
|
device_props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
|
|
received_device_name = device_props.Get(DEVICE_IFACE, 'Name').lower()
|
|
|
|
# Check expected device name is received device name
|
|
if expected_device_name == received_device_name:
|
|
# Set device iface path
|
|
dev_path = path
|
|
break
|
|
|
|
if not dev_path:
|
|
print('\nBLE device not found')
|
|
return False
|
|
|
|
print('device {} [ {} ] found'.format(devname, devaddr))
|
|
|
|
# Set device details
|
|
self.device.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DEVICE_IFACE)
|
|
self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
|
|
self.device.path = dev_path
|
|
self.device.name = devname
|
|
self.device.addr = devaddr
|
|
return True
|
|
|
|
def get_services(self):
|
|
'''
|
|
Retrieve Services found in the device connected
|
|
'''
|
|
try:
|
|
# Get current dbus objects
|
|
dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
|
|
# Get services
|
|
for path, interfaces in dbus_objs.items():
|
|
if GATT_SERVICE_IFACE in interfaces.keys():
|
|
if not path.startswith(self.device.path):
|
|
continue
|
|
received_service = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
# Retrieve all services on device iface path
|
|
# and set each service received
|
|
service = Service()
|
|
service.path = path
|
|
service.iface = dbus.Interface(received_service, GATT_SERVICE_IFACE)
|
|
service.props = dbus.Interface(received_service, DBUS_PROP_IFACE)
|
|
self.device.services.append(service)
|
|
|
|
if not self.device.services:
|
|
print('no services found for device: {}'.format(self.device.path))
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as err:
|
|
raise Exception('Failed to get services: {}'.format(err))
|
|
|
|
def get_chars(self):
|
|
'''
|
|
Get characteristics of the services set for the device connected
|
|
'''
|
|
try:
|
|
if not self.device.services:
|
|
print('No services set for device: {}'.format(self.device.path))
|
|
return
|
|
|
|
# Read chars for all the services received for device
|
|
for service in self.device.services:
|
|
char_found = False
|
|
dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
for path, interfaces in dbus_objs.items():
|
|
if GATT_CHRC_IFACE in interfaces.keys():
|
|
if not path.startswith(self.device.path):
|
|
continue
|
|
if not path.startswith(service.path):
|
|
continue
|
|
# Set characteristics
|
|
received_char = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
char = Characteristic()
|
|
char.path = path
|
|
char.iface = dbus.Interface(received_char, GATT_CHRC_IFACE)
|
|
char.props = dbus.Interface(received_char, DBUS_PROP_IFACE)
|
|
service.chars.append(char)
|
|
char_found = True
|
|
|
|
if not char_found:
|
|
print('Characteristic not found for service: {}'.format(service.iface))
|
|
|
|
except Exception as err:
|
|
raise Exception('Failed to get characteristics : {}'.format(err))
|
|
|
|
def read_chars(self):
|
|
'''
|
|
Read value of characteristics
|
|
'''
|
|
try:
|
|
if not self.device.services:
|
|
print('No services set for device: {}'.format(self.device.path))
|
|
return
|
|
|
|
# Read chars for all services of device
|
|
for service in self.device.services:
|
|
# Read properties of characteristic
|
|
for char in service.chars:
|
|
# Print path
|
|
print('Characteristic: {}'.format(char.path))
|
|
# Print uuid
|
|
uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
|
|
print('UUID: {}'.format(uuid))
|
|
# Print flags
|
|
flags = [flag for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
|
|
print('Flags: {}'.format(flags))
|
|
# Read value if `read` flag is present
|
|
if 'read' in flags:
|
|
value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
print('Value: {}'.format(value))
|
|
|
|
except Exception as err:
|
|
raise Exception('Failed to read characteristics : {}'.format(err))
|
|
|
|
def write_chars(self, new_value):
|
|
'''
|
|
Write to characteristics
|
|
'''
|
|
try:
|
|
if not self.device.services:
|
|
print('No services set for device: {}'.format(self.device.path))
|
|
return False
|
|
|
|
print('writing data to characteristics with read and write permission')
|
|
# Read chars of all services of device
|
|
for service in self.device.services:
|
|
if not service.chars:
|
|
print('No chars found for service: {}'.format(service.path))
|
|
continue
|
|
for char in service.chars:
|
|
flags = [flag.lower() for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
|
|
if not ('read' in flags and 'write' in flags):
|
|
continue
|
|
|
|
# Write new value to characteristic
|
|
curr_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
print('current value: {}'.format(curr_value))
|
|
|
|
print('writing {} to characteristic {}'.format(new_value, char.path))
|
|
char.iface.WriteValue(new_value, {}, dbus_interface=GATT_CHRC_IFACE)
|
|
|
|
time.sleep(5)
|
|
updated_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
print('updated value: {}'.format(updated_value))
|
|
|
|
if not (ord(new_value) == int(updated_value[0])):
|
|
print('write operation to {} failed'.format(char.path))
|
|
return False
|
|
print('write operation to {} successful'.format(char.path))
|
|
return True
|
|
|
|
except Exception as err:
|
|
raise Exception('Failed to write to characteristics: {}'.format(err))
|
|
|
|
def get_char_if_exists(self, char_uuid):
|
|
'''
|
|
Get char if exists for given uuid
|
|
'''
|
|
try:
|
|
for service in self.device.services:
|
|
for char in service.chars:
|
|
curr_uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
|
|
if char_uuid.lower() in curr_uuid.lower():
|
|
return char
|
|
print('char {} not found'.format(char_uuid))
|
|
return False
|
|
except Exception as err:
|
|
raise Exception('Failed to get char based on uuid {} - {}'.format(char_uuid, err))
|
|
|
|
def get_service_if_exists(self, service_uuid):
|
|
try:
|
|
for service in self.device.services:
|
|
uuid = service.props.Get(GATT_SERVICE_IFACE, 'UUID')
|
|
if service_uuid.lower() in uuid.lower():
|
|
return service
|
|
print('service {} not found'.format(service_uuid))
|
|
return False
|
|
except Exception as err:
|
|
raise Exception('Failed to get service based on uuid {} - {}'.format(service_uuid, err))
|
|
|
|
def start_notify(self, char):
|
|
try:
|
|
notify_started = 0
|
|
notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
|
|
if notifying == 0:
|
|
# Start Notify
|
|
char.iface.StartNotify()
|
|
notify_started = 1
|
|
# Check notify started
|
|
for _ in range(10, 0, -1):
|
|
notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
|
|
if notifying == 1:
|
|
print('subscribe to notifications: on')
|
|
break
|
|
if notifying == 0:
|
|
print('Failed to start notifications')
|
|
return False
|
|
|
|
# Get updated value
|
|
for _ in range(10, 0, -1):
|
|
time.sleep(1)
|
|
char_value = char.props.Get(GATT_CHRC_IFACE, 'Value')
|
|
print(char_value)
|
|
|
|
return None
|
|
|
|
except Exception as err:
|
|
raise Exception('Failed to perform notification operation: {}'.format(err))
|
|
finally:
|
|
try:
|
|
if notify_started == 1:
|
|
# Stop notify
|
|
char.iface.StopNotify()
|
|
for _ in range(10, 0, -1):
|
|
notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
|
|
if notifying == 0:
|
|
print('subscribe to notifications: off')
|
|
break
|
|
if notifying == 1:
|
|
print('Failed to stop notifications')
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
print('Warning: Failure during cleanup for start notify : {}'.format(dbus_err))
|
|
|
|
def _create_mainloop(self):
|
|
'''
|
|
Create GLibMainLoop
|
|
'''
|
|
if not self.mainloop:
|
|
self.mainloop = GLib.MainLoop()
|
|
|
|
def register_gatt_app(self):
|
|
'''
|
|
Create Gatt Application
|
|
Register Gatt Application
|
|
'''
|
|
try:
|
|
# Create mainloop, if does not exist
|
|
self._create_mainloop()
|
|
|
|
# Create Gatt Application
|
|
self.gatt_app = lib_gatt.AlertNotificationApp(self.bus, self.adapter.path)
|
|
print('GATT Application created')
|
|
self.gatt_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), GATT_MANAGER_IFACE)
|
|
|
|
# Register Gatt Application
|
|
self.gatt_mgr.RegisterApplication(
|
|
self.gatt_app, {},
|
|
reply_handler=self.gatt_app_success_handler,
|
|
error_handler=self.gatt_app_error_handler)
|
|
self.mainloop.run()
|
|
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
raise DBusException('Failed to create GATT Application : {}'.format(dbus_err))
|
|
except Exception as err:
|
|
raise Exception('Failed to register Gatt Application: {}'.format(err))
|
|
|
|
def gatt_app_success_handler(self):
|
|
print('GATT Application successfully registered')
|
|
self.mainloop.quit()
|
|
|
|
def gatt_app_error_handler(self):
|
|
raise DBusException('Failed to register GATT Application')
|
|
|
|
def check_le_iface(self):
|
|
'''
|
|
Check if LEAdvertisingManager1 interface exists
|
|
'''
|
|
try:
|
|
dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
for path, iface in dbus_objs.items():
|
|
if LE_ADVERTISING_MANAGER_IFACE in iface:
|
|
le_adv_iface_path = path
|
|
break
|
|
# Check LEAdvertisingManager1 interface is found
|
|
assert le_adv_iface_path, '\n Cannot start advertising. LEAdvertisingManager1 Interface not found'
|
|
|
|
return le_adv_iface_path
|
|
|
|
except AssertionError:
|
|
raise
|
|
except Exception as err:
|
|
raise Exception('Failed to find LEAdvertisingManager1 interface: {}'.format(err))
|
|
|
|
def register_adv(self, adv_host_name, adv_type, adv_uuid):
|
|
try:
|
|
# Gatt Application is expected to be registered
|
|
if not self.gatt_app:
|
|
print('No Gatt Application is registered')
|
|
return
|
|
|
|
adv_iface_index = 0
|
|
|
|
# Create mainloop, if does not exist
|
|
self._create_mainloop()
|
|
|
|
# Check LEAdvertisingManager1 interface exists
|
|
le_iface_path = self.check_le_iface()
|
|
|
|
# Create Advertisement data
|
|
leadv_obj = lib_gap.Advertisement(
|
|
self.bus,
|
|
adv_iface_index,
|
|
adv_type,
|
|
adv_uuid,
|
|
adv_host_name)
|
|
print('Advertisement registered')
|
|
|
|
# Register Advertisement
|
|
leadv_mgr_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_iface_path), LE_ADVERTISING_MANAGER_IFACE)
|
|
leadv_mgr_iface_obj.RegisterAdvertisement(
|
|
leadv_obj.get_path(), {},
|
|
reply_handler=self.adv_success_handler,
|
|
error_handler=self.adv_error_handler)
|
|
|
|
# Handler to read events received and exit from mainloop
|
|
GLib.timeout_add_seconds(3, self.check_adv)
|
|
|
|
self.mainloop.run()
|
|
|
|
except AssertionError:
|
|
raise
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
raise DBusException('Failure during registering advertisement : {}'.format(dbus_err))
|
|
except Exception as err:
|
|
raise Exception('Failure during registering advertisement : {}'.format(err))
|
|
else:
|
|
try:
|
|
try:
|
|
# Stop Notify if not already stopped
|
|
chars = self.gatt_app.service.get_characteristics()
|
|
for char in chars:
|
|
if char.uuid == lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID']:
|
|
if char.notifying:
|
|
char.StopNotify()
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
print('Warning: {}'.format(dbus_err))
|
|
|
|
try:
|
|
# Unregister Advertisement
|
|
leadv_mgr_iface_obj.UnregisterAdvertisement(leadv_obj.get_path())
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
print('Warning: {}'.format(dbus_err))
|
|
|
|
try:
|
|
# Remove advertising data
|
|
dbus.service.Object.remove_from_connection(leadv_obj)
|
|
except LookupError as err:
|
|
print('Warning: Failed to remove connection from dbus for advertisement object: {} - {}'.format(leadv_obj, err))
|
|
|
|
try:
|
|
# Unregister Gatt Application
|
|
self.gatt_mgr.UnregisterApplication(self.gatt_app.get_path())
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
print('Warning: {}'.format(dbus_err))
|
|
|
|
try:
|
|
# Remove Gatt Application
|
|
dbus.service.Object.remove_from_connection(self.gatt_app)
|
|
except LookupError as err:
|
|
print('Warning: Failed to remove connection from dbus for Gatt application object: {} - {}'.format(self.gatt_app, err))
|
|
|
|
except RuntimeError as err:
|
|
print('Warning: Failure during cleanup of Advertisement: {}'.format(err))
|
|
|
|
def adv_success_handler(self):
|
|
print('Registered Advertisement successfully')
|
|
|
|
def adv_error_handler(self, err):
|
|
raise DBusException('{}'.format(err))
|
|
|
|
def check_adv(self):
|
|
'''
|
|
Handler to check for events triggered (read/write/subscribe)
|
|
for advertisement registered for AlertNotificationApp
|
|
'''
|
|
try:
|
|
retry = 10
|
|
# Exit loop if read and write and subscribe is successful
|
|
if self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'], 'read') and \
|
|
self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['ALERT_NOTIF_UUID'], 'write') and \
|
|
self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'], 'notify'):
|
|
if self.mainloop.is_running():
|
|
self.mainloop.quit()
|
|
# return False to stop polling
|
|
return False
|
|
|
|
self.loop_cnt += 1
|
|
print('Check read/write/subscribe events are received...Retry {}'.format(self.loop_cnt))
|
|
|
|
# Exit loop if max retry value is reached and
|
|
# all three events (read and write and subscribe) have not yet passed
|
|
# Retry total 10 times
|
|
if self.loop_cnt == (retry - 1):
|
|
if self.mainloop.is_running():
|
|
self.mainloop.quit()
|
|
# return False to stop polling
|
|
return False
|
|
|
|
# return True to continue polling
|
|
return True
|
|
|
|
except RuntimeError as err:
|
|
print('Failure in advertisment handler: {}'.format(err))
|
|
if self.mainloop.is_running():
|
|
self.mainloop.quit()
|
|
# return False to stop polling
|
|
return False
|
|
|
|
def disconnect(self):
|
|
'''
|
|
Disconnect device
|
|
'''
|
|
try:
|
|
if not self.device or not self.device.iface:
|
|
return
|
|
print('disconnecting device')
|
|
# Disconnect device
|
|
device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
|
|
if device_conn == 1:
|
|
self.device.iface.Disconnect(dbus_interface=DEVICE_IFACE)
|
|
for cnt in range(10, 0, -1):
|
|
time.sleep(5)
|
|
device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
|
|
if device_conn == 0:
|
|
print('device disconnected')
|
|
break
|
|
print('number of retries left ({})'.format(cnt - 1))
|
|
if device_conn == 1:
|
|
print('failed to disconnect device')
|
|
|
|
self.adapter.iface.RemoveDevice(self.device.iface)
|
|
self.device = None
|
|
|
|
except dbus.exceptions.DBusException as dbus_err:
|
|
print('Warning: {}'.format(dbus_err))
|
|
except Exception as err:
|
|
raise Exception('Failed to disconnect device: {}'.format(err))
|