#!/usr/bin/python # program to interogate 1-wire bus and report all known temperatures # with some commentary # by ian # This code has no warranty, including any implied fitness for purpose # or implied merchantability. It's free, and worth every penny. # standard library modules used in code import sys import time import argparse # statistics from scipy from scipy import stats # use OWFS module import ow # set up parser with command summary parser = argparse.ArgumentParser( description='Rolling 1-wire temperatures report') # set up arguments with associated help and defaults parser.add_argument('-i', dest='sample_interval', help='interval in seconds between samples', default='30') parser.add_argument('-t', dest='trend_time', help='time in seconds over which trend is calculated', default='300') # process the arguments args=parser.parse_args() # turn the arguments into numbers sample_interval=float(args.sample_interval) trend_time=int(args.trend_time) # tell the user what is happening print "1 wire bus reports" print " sample interval: "+str(sample_interval) + " seconds" print " report trend over "+ str(trend_time) + " seconds" # connect to localhost port 4304 where owserver should be running ow.init('4304'); # now determine what sensors we will consider print "locate sensors and take initial readings:" # every sensor on the bus added to the list list = ow.Sensor('/').sensorList() # create lists for teh data we will accumulate sensorlist=[] sensordata=[] readtimes=[] # get time in seconds now tt=int(time.time()) # number of temperature sensors found so far n=0 for s in list: sys.stdout.write(" consider " + str(s) + ": ") if 'temperature' in s.entryList(): # get teh temperature from that sensor T = s.temperature # increment count of sensors n = n + 1 # tell the user teh good news sys.stdout.write("T: " + T) # record that as a sensor to interrogate sensorlist.append(s) # set found sensors to use uncached results # so will run new conversion every read cycle s.useCache( False ) # record teh time of reading and value read sensordata.append([float(T)]) readtimes.append([tt]) else: # give the user the bad news sys.stdout.write("no temperature value") print print "bus search done - " + str(n) + " temperature sensors found" print # now loop forever reading the identified sensors while 1: # determine how long to wait until next interval # note this will skip some if specified interval is too short # - it finds the next after now, not next after last sleeptime = sample_interval - (time.time() % sample_interval) time.sleep(sleeptime) # get time now and record it tt = int(time.time()) # we only record to integer seconds # show the time print (str(tt)) # work through list of sensors for s in sensorlist: n = sensorlist.index(s) # the array register of this sensor try: # just in case it has been unplugged T=float(s.temperature) except ow.exUnknownSensor: # it has been unplugged print ' sensor ' + str(s) + ' gone away - just ignore' continue # so we'll jump to teh next in the list # record value and time of this reading sensordata[n].append(T) readtimes[n].append(tt) # but we only want to keep data that's not too old # so drop any values older than trend_time value while ((tt-readtimes[n][0]) > trend_time): readtimes[n].pop(0) sensordata[n].pop(0) # print sensor name and current value print ' {!s}: {:-6.2f}'.format(s,T) # if we have at least three samples, print aggregate info if len(readtimes[n]) >= 3: slope, intercept, r_value, p_value, std_err = \ stats.linregress(readtimes[n],sensordata[n]) print ' max {:-6.2f}, min {:-6.2f}, avg {:-6.2f}'.format( \ max(sensordata[n]),min(sensordata[n]), \ sum(sensordata[n])/len(sensordata[n])) print ' trend {:-8.4f} per second'.format(slope) print ' from {:d} samples over {:d} seconds'.format( \ len(sensordata[n]),max(readtimes[n])-min(readtimes[n])) print