本文摘自西门子论坛,作者plectrum
读DB区
使用的方法依旧是read_area
def read_area(self, area, dbnumber, start, size)
area是PLC内各寄存区的程式码,具体参考下表
dbnumber是DB块编号,可以在DB块属性中常规选项中检视到,start为寄存区起始地址,size是DB块完整的长度,因为DB块中可以有各种资料型别,所以长度引数一定要准确,否则返回资料会出现问题。
要访问DB块,必须取消DB块属性中的优化的块访问选项
针对的是DB块,所有area引数需要使用0x84,size引数需要通过DB块内的资料数量和资料型别具体计算出来
因为read_area方法返回的是byteArray型别的结果,可以通过get_bool,get_int,get_read,get_dword方法直接将byteArray型别转换成对应的资料型别
比如针对get_bool方法
def get_bool(_bytearray, byte_index, bool_index)
第一个bytearray引数是read_area的返回值,byte_index引数是DB块中变数的偏移量,bool_index引数是位变数的地址。
先来看一个简单的例子,DB块中有两个变数,一个浮点数,一个布林量
main函式中的程式码如下
def main():
s71200 = snap7.client.Client()
connect(s71200, \'192.168.2.110\', 0, 1) while True: try:
data = s71200.read_area(0x84, 1, 0, 5)#此处5代表DB块中资料长度,浮点数4个byte,布林量一个byte
print(data)
value = get_real(data, 0)#此处0是浮点数的偏移量
value2 = get_bool(data, 4, 0)#此处4是布林量的偏移量
print(value, value2)
sleep(5) except Snap7Exception as e:
connect(s71200, \'192.168.2.110\', 0, 1)if __name__ == \'__main__\':
main()
执行后如下
但是这样操作很麻烦,需要提前计算好DB块中的资料长度,需要将每个资料的偏移量和资料型别都填好,这里希望有一个简单的方法,定义好DB块的资料名称,资料型别,资料偏移量,之后执行程式后,自动计算长度,对应资料型别和偏移量,从而获取正确的结果。
DB块中的资料名称及资料型别,偏移量如下图
定义DB块资料格式
db = """
DB001 Real 0.0
DB002 Bool 4.0
DB003 Int 6.0
DB004 String 8.0
DB005 Real 264.0
"""
资料格式定义好后,需要从这个资料格式中得到read_area方法要的引数,这里通过字典是很容易实现的,我们可以定义一个字典的列表,列表中每一个字典都是一个数据,字典的键值可以是name,datatype,bytebit,资料结构如下
[
{\'name\': \'DB001\', \'datatype\': \'Real\', \'bytebit\': \'0.0\'},
{\'name\': \'DB002\', \'datatype\': \'Bool\', \'bytebit\': \'4.0\'},
{\'name\': \'DB003\', \'datatype\': \'Int\', \'bytebit\': \'6.0\'},
{\'name\': \'DB004\', \'datatype\': \'String\', \'bytebit\': \'8.0\'}
{\'name\': \'DB005\', \'datatype\': \'Real\', \'bytebit\': \'264\'}
]
之后可以通过for方法依次取出每一个键值,传递到read_area方法中。
定义DBRead()函式
这里希望DBRead函式中的引数如下程式码所示:
def DBRead(dev,db_num,db_len,db_items):
dev引数是装置名称,db_num是DB块编号,db_len是DB块中所有资料总长度,db_items是DB块中资料,它的资料结构应该和上图所示的字典列表一致。
首先通过read_area获取DB块中的资料,程式码如下
data = plc.read_area(0x84,db_num,0,db_len):
通过for循环以此取出db_items中的字典,通过键判断资料型别和偏移量,之后呼叫对应的get方法,程式码如下
for item in db_items:
value = None
#取出资料的偏移量
offset = int(item[\'bytebit\'].split(\'.\')[0]) #通过键datatype获取资料型别,呼叫不同的get方法
if item[\'datatype\']==\'Real\':
value = get_real(data,offset) if item[\'datatype\']==\'Bool\':
bit =int(item[\'bytebit\'].split(\'.\')[1])
value = get_bool(data,offset,bit) if item[\'datatype\']==\'Int\':
value = get_int(data, offset) if item[\'datatype\']==\'String\':
value = get_string(data, offset)
完整DBRead函式的程式码如下
def DBRead(dev,db_num,db_len,db_items):
data = dev.read_area(0x84,db_num,0,db_len)
obj = DBObject() for item in db_items:
value = None
offset = int(item[\'bytebit\'].split(\'.\')[0]) if item[\'datatype\']==\'Real\':
value = get_real(data,offset) if item[\'datatype\']==\'Bool\':
bit =int(item[\'bytebit\'].split(\'.\')[1])
value = get_bool(data,offset,bit) if item[\'datatype\']==\'Int\':
value = get_int(data, offset) if item[\'datatype\']==\'String\':
value = get_string(data, offset)
obj.__setattr__(item[\'name\'], value) return obj
DBRead函式中还需要知道db_len的值,所以还需要定义一个函式去获得DB块中资料的总长度
定义get_db_len()函式
通过下图
可以看出,资料的总长度其实就是最后一个数据的偏移量的数值加上自生所占的资料长度,如图五个资料的总长度应该是264 + 4 = 268,所以get_db_len()函式只要找到偏移量最大的数,在获得这个偏移量对用的资料型别所占的内存长度,相加后就可以得到整个资料长度。
def get_db_len(db_items,bytebit,datatype):
db_items和DBRead函式中的一致,bytebit和datatype是db_items中字典的两个键名,通过
offset_str,datatype =[(x[bytebit]) for x in db_items],[x[datatype] for x in db_items]
获得偏移量和资料型别的列表,结果应该如下
offset_str = [\'0.0\',\'4.0\',\'6.0\',\'8.0\',\'264.0\'],datatype = [Real,Bool,Int,String,Real]
因为获取的offset值均为字串,所有没法正确获取最大值,需要将字串列表转换为整数的列表,程式码如下
for x in offset_str:
offset_int.append(int(x.split(\'.\')[0]))
之后通过index方法获得offset中最大值的索引值,通过此索引值获得datatype列表中的资料型别,程式码如下
idx = offset_int.index(max(offset_int)) #结果=4
为了得到不同资料型别占用内存的长度,需要先定义一个字典,可以通过不同的资料型别取出对应的内存长度,如下
offsets = { "Bool":2,"Int": 2,"Real":4,"DInt":6,"String":256}
之后便可获得DB块中资料总长度,如下
db_len = (max(offset_int)) + int(offsets[datatype[idx]]) #结果=264+4=268
完整的get_db_len()函式如下
def get_db_len(db_items,bytebit,datatype):
offset_int = []
offset_str,datatype =[(x[bytebit]) for x in db_items],[x[datatype] for x in db_items] for x in offset_str:
offset_int.append(int(x.split(\'.\')[0]))
idx = offset_int.index(max(offset_int)) #结果=4
db_len = (max(offset_int)) + int(offsets[datatype[idx]]) #结果=264+4=268
return db_len
至此,两个函式全部定义完了,接下来需要在主函式中构造db_items列表
主函式
通过如下的程式码构造db_items列表
itemlist = filter(lambda a: a!=\'\',db.split(\' \'))
space=\'\'
items = [
{ "name":x.split(space)[0], "datatype":x.split(space)[1], "bytebit":x.split(space)[2]
} for x in itemlist
]
通过上面的两个函式获得DB块中的资料
db_len = get_db_len(items, \'bytebit\', \'datatype\')
DB_obj = DBRead(s71200, 1, db_len, items)
print(DB_obj.DB001, DB_obj.DB002, DB_obj.DB003, DB_obj.DB004, DB_obj.DB005)
sleep(5)
最终结果如下
完整程式码
from time import sleepimport snap7from snap7.snap7exceptions import Snap7Exceptionfrom snap7.util import *
db = """
DB001 Real 0.0
DB002 Bool 4.0
DB003 Int 6.0
DB004 String 8.0
DB005 Real 264.0
"""offsets = { "Bool":2,"Int": 2,"Real":4,"DInt":6,"String":256}class DBObject(object):
passdef connect(device, ip, rack, slot):
while True: # check connection
if device.get_connected(): break
try: # attempt connection
device.connect(ip, rack, slot) except: pass
sleep(5)def DBRead(dev,db_num,db_len,db_items):
data = dev.read_area(0x84,db_num,0,db_len)
obj = DBObject() for item in db_items:
value = None
offset = int(item[\'bytebit\'].split(\'.\')[0]) if item[\'datatype\']==\'Real\':
value = get_real(data,offset) if item[\'datatype\']==\'Bool\':
bit =int(item[\'bytebit\'].split(\'.\')[1])
value = get_bool(data,offset,bit) if item[\'datatype\']==\'Int\':
value = get_int(data, offset) if item[\'datatype\']==\'String\':
value = get_string(data, offset, 256)
obj.__setattr__(item[\'name\'], value) return objdef get_db_len(db_items,bytebit,datatype):
offset_int = []
offset_str,datatype =[(x[bytebit]) for x in db_items],[x[datatype] for x in db_items] for x in offset_str:
offset_int.append(int(x.split(\'.\')[0]))
idx = offset_int.index(max(offset_int)) #结果=4
db_len = (max(offset_int)) + int(offsets[datatype[idx]]) #结果=264+4=268
return db_lendef main():
s71200 = snap7.client.Client()
connect(s71200, \'192.168.2.111\', 0, 1) while True: try:
itemlist = filter(lambda a: a != \'\', db.split(\' \'))
space = \' \'
items = [
{ "name": x.split(space)[0], "datatype": x.split(space)[1], "bytebit": x.split(space)[2]
} for x in itemlist
]
db_len = get_db_len(items, \'bytebit\', \'datatype\')
DB_obj = DBRead(s71200, 1, db_len, items)
print(DB_obj.DB001, DB_obj.DB002, DB_obj.DB003, DB_obj.DB004, DB_obj.DB005)
sleep(5) except Snap7Exception as e:
connect(s71200, \'192.168.2.111\', 0, 1)if __name__ == \'__main__\':
main()