OPC UA 方法调用:动态检测与转换自定义输入参数

OPC UA 方法调用:动态检测与转换自定义输入参数

本文档旨在指导开发者在使用 OPC UA 客户端调用方法时,如何动态检测并正确转换自定义输入参数。通过读取方法的 InputArguments 属性,获取参数的数据类型信息,并将其映射到相应的 python 类,从而实现参数的自动转换,避免手动指定数据类型的繁琐过程,提高代码的灵活性和可维护性。

在 OPC UA 通信中,调用方法时,需要将参数转换为服务端期望的数据类型。对于标准数据类型,这通常不是问题。但当涉及到自定义数据类型时,客户端需要预先知道参数的具体结构,这在动态客户端场景下变得困难。本文将介绍如何利用 asyncua 库动态获取方法参数类型,并进行相应的类型转换

获取方法参数类型信息

OPC UA 方法的参数信息存储在其 0:InputArguments 变量中。通过读取该变量的值,我们可以获得一个 Argument 对象的列表,每个对象包含了参数的名称、数据类型等信息。

from asyncua import Client from asyncua.ua import ObjectIds  async def get_method_input_arguments(client, method_node):     """     获取 OPC UA 方法的输入参数信息。      Args:         client: asyncua.Client 实例。         method_node: 方法节点的 Node 对象。      Returns:         一个包含 Argument 对象的列表。     """     input_arguments_node = await client.get_node(method_node.nodeid.to_string() + ".InputArguments")     arguments = await input_arguments_node.read_value()     return arguments  # 示例用法 async def main():     async with Client("opc.tcp://your_opcua_server:4840") as client:         # 假设 method_node 是你要调用的方法节点         method_node = await client.get_node("ns=2;i=1234")  # 替换为你的方法节点 ID         arguments = await get_method_input_arguments(client, method_node)         for arg in arguments:             print(f"参数名称: {arg.Name}, 数据类型: {arg.DataType}")  if __name__ == "__main__":     import asyncio     asyncio.run(main())

将 DataType NodeId 映射到 Python 类

Argument 对象中的 DataType 字段是一个 NodeId,它标识了参数的数据类型。我们需要将这个 NodeId 映射到相应的 Python 类,以便创建正确类型的参数值。

from asyncua.ua import ObjectIds from asyncua import ua  base_type_list = {     ua.NodeId(ObjectIds.Null): None,     ua.NodeId(ObjectIds.Boolean): bool,     ua.NodeId(ObjectIds.SByte): ua.SByte,     ua.NodeId(ObjectIds.Byte): ua.Byte,     ua.NodeId(ObjectIds.Int16): ua.Int16,     ua.NodeId(ObjectIds.UInt16): ua.UInt16,     ua.NodeId(ObjectIds.Int32): ua.Int32,     ua.NodeId(ObjectIds.UInt32): ua.UInt32,     ua.NodeId(ObjectIds.Int64): ua.Int64,     ua.NodeId(ObjectIds.UInt64): ua.UInt64,     ua.NodeId(ObjectIds.Float): ua.Float,     ua.NodeId(ObjectIds.Double): ua.Double,     ua.NodeId(ObjectIds.String): ua.String,     ua.NodeId(ObjectIds.DateTime): ua.DateTime,     ua.NodeId(ObjectIds.Guid): ua.Guid,     ua.NodeId(ObjectIds.ByteString): ua.ByteString,     ua.NodeId(ObjectIds.XmlElement): ua.XmlElement,     ua.NodeId(ObjectIds.NodeId): ua.NodeId,     ua.NodeId(ObjectIds.ExpandedNodeId): ua.ExpandedNodeId,     ua.NodeId(ObjectIds.StatusCode): ua.StatusCode,     ua.NodeId(ObjectIds.QualifiedName): ua.QualifiedName,     ua.NodeId(ObjectIds.LocalizedText): ua.LocalizedText,     ua.NodeId(ObjectIds.Structure): ua.ExtensionObject,     ua.NodeId(ObjectIds.DataValue): ua.DataValue,     ua.NodeId(ObjectIds.BaseVariableType): ua.Variant,     ua.NodeId(ObjectIds.DiagnosticInfo): ua.DiagnosticInfo, }   def get_class_from_nodeid(datatype_nodeid):     # check all type dicts       sources = [base_type_list, ua.basetype_by_datatype, ua.extension_objects_by_datatype, ua.enums_by_datatype]    cls = None    for src in sources:        cls = src.get(datatype_nodeid, None)        if cls is not None:           return cls           return cls

get_class_from_nodeid 函数首先查找 base_type_list,然后依次查找 ua.basetype_by_datatype、ua.extension_objects_by_datatype 和 ua.enums_by_datatype。这些字典包含了 NodeId 到 Python 类的映射关系。

动态构建参数并调用方法

有了参数类型信息和类型映射,我们可以动态构建参数并调用方法。

async def call_method_with_dynamic_args(client, method_node, arg_values):     """     使用动态参数调用 OPC UA 方法。      Args:         client: asyncua.Client 实例。         method_node: 方法节点的 Node 对象。         arg_values: 参数值的列表。      Returns:         方法调用的结果。     """     arguments = await get_method_input_arguments(client, method_node)     args = []     for i, arg in enumerate(arguments):         data_type_class = get_class_from_nodeid(arg.DataType)         if data_type_class is None:             raise ValueError(f"不支持的数据类型: {arg.DataType}")          # 将参数值转换为正确的类型         try:             converted_value = data_type_class(arg_values[i]) if data_type_class != bool else bool(arg_values[i]) # bool类型转换需要特殊处理         except Exception as e:             raise ValueError(f"参数转换失败: {e}")          args.append(converted_value)      # 调用方法     result = await method_node.call_method(*args)     return result  # 示例用法 async def main():     async with Client("opc.tcp://your_opcua_server:4840") as client:         method_node = await client.get_node("ns=2;i=1234")  # 替换为你的方法节点 ID         arg_values = [99, 100, 199]  # 替换为你的参数值         try:             result = await call_method_with_dynamic_args(client, method_node, arg_values)             print(f"方法调用结果: {result}")         except Exception as e:             print(f"方法调用失败: {e}")  if __name__ == "__main__":     import asyncio     asyncio.run(main())

在这个例子中,call_method_with_dynamic_args 函数首先获取方法的参数信息,然后根据参数类型将传入的参数值转换为正确的类型,最后调用方法并返回结果。

注意事项

  • 错误处理: 在实际应用中,需要添加更完善的错误处理机制,例如处理不支持的数据类型、参数转换失败等情况。
  • 自定义数据类型: 对于复杂的自定义数据类型,可能需要更复杂的转换逻辑。
  • 性能优化 频繁调用 get_method_input_arguments 可能会影响性能,可以考虑缓存参数信息。

总结

通过动态检测 OPC UA 方法的输入参数类型,并进行相应的类型转换,我们可以编写更加灵活和通用的 OPC UA 客户端程序。这对于需要与各种不同的 OPC UA 服务端交互的应用程序来说尤其重要。 本文提供了一种实现动态参数类型转换的基本方法,开发者可以根据自己的实际需求进行扩展和优化。

© 版权声明
THE END
喜欢就支持一下吧
点赞10 分享