leda_python/deviceMbus.py (503 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ====#====#====#====
# file: deviceMbus
# time: 1/30/18
# ====#====#====#====
from . import ledaException as exception
from gi.repository import GLib
import dbus.mainloop.glib
from . import mbusConfig
from .refactoring import dbus_service
import threading
import logging
import json
from . import mbus
import os
import re
from . import json_coder
_logger = logging.getLogger(__name__)
class SyncMsg_Event(object):
def __init__(self):
self.event = threading.Event()
self.clear()
self.msg = None
def clear(self):
self.msg = None
self.event.clear()
def wait(self, time_s):
self.event.wait(time_s)
return self.msg
def set(self, msg):
self.msg = msg
self.event.set()
mbusLoopFlag = False
def mbus_loop():
try:
_logger.debug("mbus main looping...")
mainloop = GLib.MainLoop()
mainloop.run()
except:
_logger.debug(">>>>>>>>>>>>> driver existed<<<<<<<<<<<<<<<")
os._exit(0)
class device_callback(object):
def callService(self, name, input):
'''
:param name[string]: method name
:param input[string]: formate: key-value json-string ,eg:
{
"params":{
"args1": xxx,
"args2": yyy
}
}
:return:
code[int] : 消息编码和msg对应
msg[string] : 返回的执行状态
output[string] : 方法返回的输出数据
{
"key1": xxx,
"key2": yyy,
...
}
'''
raise exception.LedaCallBackException("callService is empty")
def getprofile_cb(self):
'''
:return: profile[string]: 设备三要素模型
'''
raise exception.LedaCallBackException("getprofile_cb is empty")
def getProperties(self, input):
'''
:param input[string]: 格式json中的数组类型:[property1,property2]
:return: 格式为key-value形式的json串,如:{property1:xxx,property2:yyy}
'''
raise exception.LedaCallBackException("getProperties is empty")
def setProperties(self, input):
'''
:param input[string]:属性列表,其格式为key-value形式的json串,如:{property1:xxx,property2:yyy}
:return: key-value json-string, if no data to return ,you should return "{}"
'''
raise exception.LedaCallBackException("setProperties is empty")
class device_service(object):
def __init__(self, cloud_id, device_name, product_key, bus_callback_object):
self.cloud_id = cloud_id
self.device_name = device_name
self.product_key = product_key
self.bus_callback_object = bus_callback_object
self.deviceMbusHandle = None
self.deviceMbusObject = None
def get_cloud_id(self):
return self.cloud_id
def _getDeviceProfile(self, interface):
@dbus.service.method(interface, out_signature='s')
def getDeviceProfile(self):
_logger.debug("cloud_id:%s, method: getDeviceProfile", self.cloud_id)
try:
profile = self.callback_funs.getprofile_cb()
if (False == isinstance(profile, str)):
_logger.warning("getprofile_cb(cloud_id:%s) return args type is invalid: %s", self.cloud_id,
type(profile))
retDict = {
"code": exception.LEDA_ERROR_INVAILD_PARAM, # params invalid
"message": "getprofile_cb(cloud_id:%s) return args type is invalid: %s" % (
self.cloud_id, type(profile))
}
_logger.debug("getDeviceProfile(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
profile_dict = json.loads(profile)
except(AttributeError, ValueError, TypeError) as err:
_logger.warning('%s', err)
retDict = {
"code": exception.LEDA_ERROR_INVAILD_PARAM, # params invalid
"message": "%s" % (err)
}
_logger.debug("getDeviceProfile(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
except:
_logger.exception("Err")
retDict = {
"code": exception.LEDA_ERROR_FAILED, # params invalid
"message": "unkonwn error"
}
_logger.debug("getDeviceProfile(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
retDict = {
"code": exception.LEDA_SUCCESS,
"message": "successfully",
"params": {
"deviceProfile": profile_dict
}
}
_logger.debug("getDeviceProfile(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
return getDeviceProfile
def _callServices(self, interface):
@dbus.service.method(interface, in_signature='ss', out_signature='s')
def callServices(self, methodName, inMsg):
_logger.debug("callServices(cloud_id:%s) in params: method: %s, args: %s", self.cloud_id, methodName, inMsg)
codeInfoDict = {
exception.LEDA_SUCCESS: 'successfully',
exception.LEDA_ERROR_INVAILD_PARAM: 'invalid params',
exception.LEDA_ERROR_FAILED: 'exec failed',
exception.LEDA_ERROR_TIMEOUT: 'timeout',
exception.LEDA_ERROR_NOT_SUPPORT: 'not support',
exception.LEDA_ERROR_PROPERTY_NOT_EXIST: 'property not exist',
exception.LEDA_ERROR_PROPERTY_READ_ONLY: 'property read only',
exception.LEDA_ERROR_PROPERTY_WRITE_ONLY: 'property write only',
exception.LEDA_ERROR_SERVICE_NOT_EXIST: 'service not exist',
exception.LEDA_ERROR_SERVICE_INPUT_PARAM: 'invalid service input params'
}
try:
inArgs = json.loads(inMsg)["params"]
if (methodName == "get"):
code, retInfo = self.callback_funs.getProperties(inArgs)
if (False == isinstance(retInfo, dict) or (False == isinstance(code, int))):
_logger.warning("get(cloud_id:%s) return args type is invalid: %s", self.cloud_id,
type(retInfo))
retDict = {
"code": exception.LEDA_ERROR_INVAILD_PARAM, # params invalid
"message": "get(cloud_id:%s) return args type is invalid: %s" % (
self.cloud_id, type(retInfo)),
"params": {}
}
_logger.debug("get(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
if (exception.LEDA_SUCCESS != code):
retDict = {
"code": exception.LEDA_ERROR_FAILED,
"message": "get(cloud_id:%s) exec failed" % (self.cloud_id),
"params": {}
}
_logger.debug("get(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
retDict = {
"code": code,
"message": codeInfoDict[code],
"params": retInfo
}
_logger.debug("get(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, cls=json_coder.Json_Encoder, ensure_ascii = False)
elif (methodName == "set"):
code, retInfo = self.callback_funs.setProperties(inArgs)
if (False == isinstance(retInfo, dict) or (False == isinstance(code, int))):
_logger.warning("set(cloud_id:%s) return args type is invalid: %s", self.cloud_id,
type(retInfo))
retDict = {
"code": exception.LEDA_ERROR_INVAILD_PARAM, # params invalid
"message": "set(cloud_id:%s) return args type is invalid: %s" % (
self.cloud_id, type(retInfo)),
"params": {}
}
_logger.debug("set(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
if (exception.LEDA_SUCCESS != code):
retDict = {
"code": exception.LEDA_ERROR_FAILED,
"message": "set(cloud_id:%s) exec failed" % (self.cloud_id),
"params": {}
}
_logger.debug("set(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
retDict = {
"code": code,
"message": codeInfoDict[code],
"params": {}
}
_logger.debug("set(cloud_id:%s): retMsg: %s", self.cloud_id, retDict)
return json.dumps(retDict, ensure_ascii = False)
else:
code, output = self.callback_funs.callService(methodName, inArgs)
if ((False == isinstance(code, int)) or (False == isinstance(output, dict))):
_logger.warning("callService(cloud_id:%s) return args type is invalid", self.cloud_id)
retDict = {
"code": exception.LEDA_ERROR_INVAILD_PARAM, # params invalid
"message": "callService(cloud_id:%s) return args type is invalid" % (self.cloud_id),
"params": {
"code": exception.LEDA_ERROR_INVAILD_PARAM,
"message": "callService(cloud_id:%s) return args type is invalid" % (self.cloud_id),
"data": {}
}
}
_logger.debug("callServices(cloud_id:%s): %s retMsg: %s", self.cloud_id, methodName, retDict)
return json.dumps(retDict, ensure_ascii = False)
if (exception.LEDA_SUCCESS != code):
retDict = {
"code": exception.LEDA_ERROR_FAILED,
"message": "callService(cloud_id:%s) exec failed" % (self.cloud_id),
"params": {
"code": exception.LEDA_ERROR_FAILED,
"message": "callService(cloud_id:%s) exec failed" % (self.cloud_id),
"data": {}
}
}
_logger.debug("callServices(cloud_id:%s): %s retMsg: %s", self.cloud_id, methodName,
retDict)
return json.dumps(retDict, ensure_ascii = False)
data = output
except(AttributeError, ValueError, TypeError, KeyError) as err:
_logger.exception('Err')
_logger.warning('%s', err)
retDict = {
"code": exception.LEDA_ERROR_INVAILD_PARAM, # params invalid
"message": "%s" % (err),
"params": {}
}
_logger.debug("callServices(cloud_id:%s): %s retMsg: %s", self.cloud_id, methodName, retDict)
return json.dumps(retDict, ensure_ascii = False)
except:
_logger.exception("Err")
retDict = {
"code": exception.LEDA_ERROR_FAILED, # params invalid
"message": "unkonwn error",
"params": {}
}
_logger.warning("callServices(cloud_id:%s): %s retMsg: %s", self.cloud_id, methodName, retDict)
return json.dumps(retDict, ensure_ascii = False)
retDict = {
"code": exception.LEDA_SUCCESS,
"message": "successfully",
"params": {
"code": code,
"message": codeInfoDict[code],
"data": data
}
}
_logger.debug("callServices(cloud_id:%s): %s retMsg: %s", self.cloud_id, methodName, retDict)
return json.dumps(retDict, ensure_ascii = False)
return callServices
def _createMbusDynamicObject(self):
interface = mbusConfig.CMP_DEVICE_WKN_PREFIX + self.cloud_id
attrDict = {
"callback_funs": self.bus_callback_object,
"cloud_id": self.cloud_id,
"callServices": self._callServices(interface),
"getDeviceProfile": self._getDeviceProfile(interface)
}
DemoObjectClass = type("Device_" + self.product_key + self.device_name, (dbus_service.DbusObject,), attrDict)
objPath = '/' + interface.replace('.', '/')
objectHandle = self.deviceMbusHandle.createObject(DemoObjectClass, objPath)
return objectHandle
def releaseMbusObject(self):
if (self.deviceMbusHandle):
bus = self.deviceMbusHandle.getBus()
wellKnownName = mbusConfig.CMP_DEVICE_WKN_PREFIX + self.cloud_id
objectPath = '/' + wellKnownName.replace('.', '/')
if (None != self.deviceMbusObject):
self.deviceMbusObject.remove_from_connection(bus, objectPath)
self.deviceMbusObject = None
self.deviceMbusHandle.releaseName()
self.deviceMbusHandle = None
def device_report_property(self, report_info):
'''
:param report_info: report_info 上报信息,其格式为key-value形式的json串,如上报属性:
{
"property1": {
"value" : "xxx",
"time" : 1524448722000
},
"property1": {
"value" : "yyy",
"time" : 1524448722000
}
...
}
:return:
'''
if (None == self.deviceMbusHandle):
_logger.warning("device(%s) can't report property unless online ", self.cloud_id)
raise exception.LedaReportPropertyException(
"device(%s) can't report property unless online" % (self.cloud_id))
if (False == isinstance(report_info, str)):
raise exception.LedaReportPropertyException(
"device(%s):device_report_property,params type is invalid: %s" % (self.cloud_id, type(report_info)))
srcWKN = self.deviceMbusHandle.getName()
srcInterface = srcWKN
srcObjectPath = "/" + srcInterface.replace(".", "/")
self.deviceMbusHandle.unicastSignal(srcObjectPath, srcInterface, mbusConfig.DMP_SUB_WKN, 's',
"propertiesChanged", report_info)
_logger.info("Device(%s): report properties: %s" % (self.cloud_id, report_info))
def device_report_event(self, name, report_info):
'''
:param name: 事件名称
:param report_info: 携带信息,其格式为key-value形式的json串, 如上报事件:
{
"params": {
"value" : {
"key1":"value1",
"key2":"value2"
},
"time" : 1524448722000
}
}
:return:
'''
if (None == self.deviceMbusHandle):
_logger.warning("device(%s) can't report event unless online ", self.cloud_id)
raise exception.LedaReportEventException("device(%s) can't report event unless online" % (self.cloud_id))
if ((False == isinstance(report_info, str)) or (False == isinstance(name, str))):
raise exception.LedaReportEventException(
"device(%s):device_report_event,params type is invalid" % (self.cloud_id))
if (len(name) > mbusConfig.STRING_NAME_MAX_LEN):
raise exception.LedaReportEventException(
"device(%s):device_report_event,params name is too long(%s)" % (len(name)))
srcWKN = self.deviceMbusHandle.getName()
srcInterface = srcWKN
srcObjectPath = "/" + srcInterface.replace(".", "/")
self.deviceMbusHandle.unicastSignal(srcObjectPath, srcInterface, mbusConfig.DMP_SUB_WKN, 's', name, report_info)
_logger.info("Device(%s): report event(%s): %s" % (self.cloud_id, name, report_info))
class driver_service(object):
def __init__(self):
self.driverMbusHandle = None
self.driverMbusObject = None
self.device_service_dict = {}
self.deviceServiceDictLock = threading.Lock()
self.driver_name = None
self.driver_id = None
def _exit(self, connection):
_logger.warning("the connection is Abnormal(closed or bus daemon crashed),the process exited automatically")
os._exit(0)
def _notify_config(self, interface):
@dbus.service.method(interface, in_signature='ss', out_signature='i')
def notify_config(self, key, value):
if (self.config_callback_obj):
t = threading.Thread(target=self.config_callback_obj.deviceConfigCB, args=(key, value))
t.setDaemon(True)
t.start()
return exception.LEDA_SUCCESS
return notify_config
def _getDeviceList(self, interface):
@dbus.service.method(interface, in_signature='s', out_signature='s')
def getDeviceList(self, deviceState):
devNum = 0
devList = []
with self.deviceServiceDictLock:
try:
if (deviceState == ""):
for pk_dn in self.device_service_dict:
devNum += 1
devList.append(self.device_service_dict[pk_dn][0]) # cloud_id
elif (deviceState == "online"):
for pk_dn in self.device_service_dict:
if (None != self.device_service_dict[pk_dn][1].deviceMbusHandle):
devNum += 1
devList.append(self.device_service_dict[pk_dn][0]) # cloud_id
elif (deviceState == "offline"):
for pk_dn in self.device_service_dict:
if (None == self.device_service_dict[pk_dn][1].deviceMbusHandle):
devNum += 1
devList.append(self.device_service_dict[pk_dn][0]) # cloud_id
else:
_logger.warning("method: getDeviceList inparams is invalid")
except:
_logger.exception("Err")
outMsg = {
"params": {
"devNum": devNum,
"devList": devList
}
}
return json.dumps(outMsg, ensure_ascii = False)
return getDeviceList
def _releaseMbusObject(self):
if (self.driverMbusHandle):
with self.deviceServiceDictLock:
for pk_dn in self.device_service_dict:
self.device_service_dict[pk_dn][0] = None # clean cloud_id
self.device_service_dict[pk_dn][1].device_disconnect()
self.device_service_dict = {}
try:
bus = self.driverMbusHandle.getBus()
except:
_logger.exception('Err')
return
wellKnownName = mbusConfig.CMP_DRIVER_WKN_PREFIX + self.driver_id
objectPath = '/' + wellKnownName.replace('.', '/')
if (None != self.driverMbusObject):
self.driverMbusObject.remove_from_connection(bus, objectPath)
self.driverMbusObject = None
self.driverMbusHandle.releaseName()
def _createMbusDynamicObject(self, driver_name):
interface = mbusConfig.CMP_DRIVER_WKN_PREFIX + driver_name
attrDict = {
"device_service_dict": self.device_service_dict,
"deviceServiceDictLock": self.deviceServiceDictLock,
"config_callback_obj": None,
"getDeviceList": self._getDeviceList(interface),
"notify_config": self._notify_config(interface)
}
DemoObjectClass = type("Driver_" + driver_name, (dbus.service.Object,), attrDict)
objPath = '/' + interface.replace('.', '/')
objectHandle = self.driverMbusHandle.createObject(DemoObjectClass, objPath)
return objectHandle
def _openMonitorMbusDaemon(self):
bus = self.driverMbusHandle.getBus()
bus.call_on_disconnection(self._exit)
def driver_init_with_driverId(self, driver_id):
global mbusLoopFlag
if (False == mbusLoopFlag):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if (False == isinstance(driver_id, str)):
raise exception.LedaParamsException("driver_init_with_driverId: driver_id type is invalid")
try:
wkn = mbusConfig.CMP_DRIVER_WKN_PREFIX + driver_id
self.driverMbusHandle = mbus.MbusConnect(wkn)
self._openMonitorMbusDaemon()
self.driverMbusObject = self._createMbusDynamicObject(driver_id)
self.driver_id = driver_id
except SystemExit:
raise exception.LedaException("mbus existed")
except:
_logger.exception("Err")
raise exception.LedaException("mbus connect failed")
if (False == mbusLoopFlag):
t = threading.Thread(target=mbus_loop, name="mbusLoop")
t.setDaemon(True)
t.start()
mbusLoopFlag = True
def driver_init(self, driver_name):
global mbusLoopFlag
if (False == mbusLoopFlag):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if (False == isinstance(driver_name, str)):
raise exception.LedaParamsException("driver_init: driver_name type is invalid")
try:
wkn = mbusConfig.CMP_DRIVER_WKN_PREFIX + driver_name
self.driverMbusHandle = mbus.MbusConnect(wkn)
self._openMonitorMbusDaemon()
self.driverMbusObject = self._createMbusDynamicObject(driver_name)
self.driver_name = driver_name
except SystemExit:
raise exception.LedaException("mbus existed")
except:
_logger.exception("Err")
raise exception.LedaException("mbus connect failed")
if (False == mbusLoopFlag):
t = threading.Thread(target=mbus_loop, name="mbusLoop")
t.setDaemon(True)
t.start()
mbusLoopFlag = True
def driver_set_watchdog(self, thread_name, count_down):
'''
:param thread_name: 需要保活的线程名称
:param count_down: 倒计时时间,-1表示停止保活
:return:
'''
if (None == self.driverMbusHandle):
raise exception.LedaBusHandleException("mbus Handle is None")
if (False == isinstance(thread_name, str)):
raise exception.LedaRPCMethodException("driver_set_watchdog: thread_name is valid:%s" % (thread_name))
else:
if ((len(thread_name) > mbusConfig.STRING_NAME_MAX_LEN) or (len(thread_name) == 0)):
raise exception.LedaRPCMethodException("driver_set_watchdog: thread_name is valid:%s" % (thread_name))
if (False == isinstance(count_down, int)):
raise exception.LedaRPCMethodException("driver_set_watchdog: count_down is valid:%s" % (count_down))
else:
if ((count_down == 0) or (count_down < -2)):
raise exception.LedaRPCMethodException("driver_set_watchdog: count_down is valid:%s" % (count_down))
try:
self.driverMbusHandle.feedDog(thread_name, count_down)
except:
_logger.exception("Err")
raise exception.LedaFeedDogException("feed dog failed")
def driver_exit(self):
if (None == self.driverMbusHandle):
_logger.debug("driverMbusHandle is None")
else:
self._releaseMbusObject()
self.driverMbusHandle.close()
self.driverMbusHandle = None