This commit is contained in:
Dale Lane 2008-11-26 21:40:59 +00:00
parent 46b1663813
commit 9302406fbc
10 changed files with 823 additions and 0 deletions

271
perl/CurrentCost.pm Normal file
View file

@ -0,0 +1,271 @@
# Publish CurrentCost power information to a nano broker
# Round most values and only publish if data changes
#
# Anton Piatek 04/2008
package CurrentCost;
require "nupub.pl";
sub new
{
my $class = shift || die;
my %params = @_;
die "broker not set" unless $params{broker};
die "name not set" unless $params{name};
my $port = $params{port} || 1883;
my $keepalive = $params{keepalive} || 120;
my $debug = $params{debug} || 0;
my $self = {
_broker => $params{broker},
_name => $params{name},
_port => $port,
_keepalive => $keepalive,
_debug => $debug,
_prev_watt_value => -1,
_prev_temperature => 0,
_watt_skipped => 0,
_prev_hours => 0,
_prev_days => 0,
_prev_months => 0,
_prev_years => 0,
};
bless $self, $class;
return $self;
}
sub connect
{
my $self = shift || die;
if (&upub_connect ("currentcost_$self->{_name}",$self->{_broker},$self->{_port}))
{
if($self->{_debug}){
print "connected OK\n";
}
$self->{_connected}=1;
return 1;
}else{
print "couldnt connect\n";
$self->{_connected}=0;
return 0;
}
}
sub publish_power
{
my $self = shift || die;
my $value = shift || die "no value to publish";
if(! $self->{_connected}){
$connect = $self->connect();
if(!$connect){
print "Not connected, not publishing\n";
return 0;
}
}
# round to the nearest 50W and convert to KW
$value = int($value/50 + 0.5)*50;
$value /= 1000;
if ($value != $self->{_prev_watt_value})
{
if ($self->{_watt_skipped} >= 10)
{
#If any were skipped, republish the prev_value just before
#publishing the new value so a long flat-line is graphed
$self->{_watt_skipped} = 0;
if($self->{_debug}){
print "pub: '$self->{_prev_watt_value}' KW\n";
}
&upub_publish ("PowerMeter/CC/$self->{_name}","n",$self->{_prev_watt_value});
}
&upub_publish ("PowerMeter/CC/$self->{_name}","n",$value);
if($self->{_debug}){
print "pub: '$value' KW\n";
}
$self->{_prev_watt_value} = $value;
}
else
{
$self->{_watt_skipped}++;
if ($self->{_watt_skipped} == 10)
{
print "60 second flat-line marker\n";
}
}
}
sub publish_temp
{
my $self = shift || die;
my $value = shift || die "no value to publish";
if(! $self->{_connected}){
$connect = $self->connect();
if(!$connect){
print "Not connected, not publishing\n";
return 0;
}
}
# round to nearest degree
$value = int($value + 0.5);
if ($value != $self->{_prev_temperature})
{
if($self->{_debug}){
print "pub temp: $value\n";
}
$self->{_prev_temperature} = $value;
print "temperature: $value\n";
&upub_publish ("PowerMeter/temp/$self->{_name}","y",$value);
}
}
sub publish_history
{
my $self = shift || die;
local $_ = shift || die "no xml";
if(! $self->{_connected}){
$connect = $self->connect();
if(!$connect){
print "Not connected, not publishing\n";
return 0;
}
}
$hours = &process_history(/<hrs>(.*)<\/hrs>/);
$days = &process_history(/<days>(.*)<\/days>/);
$months = &process_history(/<mths>(.*)<\/mths>/);
$years = &process_history(/<yrs>(.*)<\/yrs>/);
if( $hours ne $self->{_prev_hours} )
{
if($self->{_debug}){
print "pub: hours:\t$hours\n";
}
$self->{_prev_hours} = $hours;
&upub_publish ("PowerMeter/history/$self->{_name}/hours","y",$hours);
}
if( $days ne $self->{_prev_days} )
{
if($self->{_debug}){
print "pub: days: $days\n";
}
$self->{_prev_days} = $days;
&upub_publish ("PowerMeter/history/$self->{_name}/days","y",$days);
}
if($months ne $self->{_prev_months} )
{
if($self->{_debug}){
print "pub: months:\t$months\n";
}
$self->{_prev_months} = $months;
&upub_publish ("PowerMeter/history/$self->{_name}/months","y",$months);
}
if($years ne $self->{_prev_years} )
{
if($self->{_debug}){
print "pub: years: $years\n";
}
$self->{_prev_years} = $years;
&upub_publish ("PowerMeter/history/$self->{_name}/years","y",$years);
}
}
sub publish_all
{
my $self = shift || die;
my $xml = shift || die "no xml";
if ($xml =~ /<ch1><watts>(.*?)<\/watts>/){
$self->publish_power($1);
}
if ($xml =~ /<tmpr>(.*)<\/tmpr>/){
$self->publish_temp($1);
}
if ($xml =~ /<hist>(.*)<\/hist>/)
{
$self->publish_history($1);
}
}
sub process_history
{
my ($data) = @_;
my ($result);
#print "data: '$data'\n";
while ($data =~ /<(.)(.*?)>(.*?)<\/.*?>/g)
{
($type,$index,$value) = ($1,$2,$3);
#print "$type $index\t$value\n";
$result .= $value." ";
}
#print $result,"\n";
return $result;
}
sub disconnect
{
my $self = shift||die;
if($self->{_debug}){
print "disconnecting\n";
}
&upub_disconnect();
}
sub DESTROY
{
my $self = shift||die;
$self->disconnect();
}
1;

4
perl/broker.cfg Normal file
View file

@ -0,0 +1,4 @@
connection xxx_realtime
addresses 204.146.213.96:1883
topic PowerMeter/# out

31
perl/currentcost.pl Normal file
View file

@ -0,0 +1,31 @@
# read the currentcost meter and publish the power in KW, to the nearest
# 100 watts, to an MQTT broker, along with temperature and history data.
# Andy S-C 16-Sep-07
########################################################################
$your_name = "yourname";
$serial_port = "/dev/ttyUSB0";
$broker = "127.0.0.1";
#$broker = "realtime.ngi.ibm.com";
########################################################################
use CurrentCost;
while (1)
{
open (SERIAL,"+<$serial_port") || die "can't open $serial_port";
$cc = new CurrentCost(broker=>$broker,name=>$your_name,debug=>1);
while (<SERIAL>)
{
$xml = $_;
$cc->publish_all($xml);
}
}

376
perl/nupub.pl Normal file
View file

@ -0,0 +1,376 @@
# new micro pub (nupub.pl)
# Andy S-C 16-Jun-05
# new version 5-May-06
# publishes a series of mqtt messages to a broker in the most unobtrusive,
# yet still readable, way I can think of.
# only does qos 0
# can do LWT
# if clientid is null (""), it makes a random one for you
################################################################################
# {optional}
# upub_lwt ($keepalive,$topic,$qos,$retain,$content);
# upub_connect ($clientid,$broker,$port)
# upub_publish ($topic,$retain,$content)
# {optional, though desirable if you set LWT}
# upub_disconnect
# semantic oddities
# upub_publish will attempt to reconnect if it detects a problem with the connection
# upub_connect with empty parameter list is "reconnect" to where you connected earlier
# NB do not just call upub_connect with NO parameters (i.e. "upub_connect;" ) ...
# that does something horrible and you get the stack contents of the current function
# passed into connect, which is NOT what you want, believe me!
# so it must be called like this: "upub_connect()"... though as it's already in publish
# you almost certainly won't need to anyway!
# global variables:
# $upub_keepalive, $upub_flags, and $upub_strings,
# $upub_clientid, $upub_broker, $upub_port
################################################################################
# test:
# nice simple example
if (0)
{
if (&upub_connect ("asc","10.0.0.14",1883))
{
for ($i=0;$i<10;$i++)
{
&upub_publish ("test","n","hello Andy [$i] $$");
sleep 2;
}
}
&upub_disconnect;
}
# more complicated example, with LWT and forcing a reconnect
if (0)
{
&upub_lwt (5,"lwt",0,0,"nupub died");
if (&upub_connect ("asc","10.0.0.14",1883))
{
for ($i=0;$i<10;$i++)
{
print "sending message $i\n";
if (!&upub_publish ("test","n","hello Andy [$i] $$"))
{
print "message $i may not have got through\n";
# it's tempting to republish here, but we're not really allowed to
# as qos0 is AT MOST once, and there's a CHANCE the first one got through!
}
if ($i == 4)
{
print "going to sleep....";
sleep 10;
print "awake\n";
}
sleep 2;
}
sleep 5;
# if you comment this line out, it should fire LWT
#&upub_ping;
sleep 5;
&upub_disconnect;
}
else
{
print "couldn't connect\n";
}
}
################################################################################
use Socket;
# disable the signal handler for sig pipe
$SIG{'PIPE'}='IGNORE';
# $upub_keepalive, $upub_flags, and $upub_strings are global
sub upub_lwt
{
my ($keepalive,$topic,$qos,$retain,$content) = @_;
# note that although WE can't do qos>0, the broker, can, so we can set qos on LWT
$upub_keepalive = $keepalive;
# LWT flags: x, x, ret, qos qos, will, (cleanstart, 0)
$upub_flags = ( (lc($retain) eq "y" || $retain==1) ?1:0)*32 + $qos * 8 + 4;
# now we can just bitwise OR this into the connect flags byte in connect
# will topic, will content, both UTF encoded
$upub_strings = &UTF($topic) . &UTF($content);
# append this to the end of the payload in connect
# if these variables are set, we'll use them, if not, their default values
# will do the right things for us (so we don't need a flag to say "LWT set")
}
################################################################################
# returns true if connected, false if not
sub upub_connect
{
my ($clientid,$broker,$port) = @_;
my ($fixed,$variable,$payload,$msg,$rc);
# if only the client id is null
if (!$clientid && $broker && $port)
{
# make a random client id:
# upub_{last 3 digits of time}_{random 1-99}_{process id last 3 digits}
$clientid = "upub_".(time %1000)."_".int(rand(100))."_".($$ % 1000);
}
# if there are NO parameters
if ($#_ == -1)
{
# it's a reconnect, use the parameters we used last time
$clientid = $upub_clientid;
$broker = $upub_broker;
$port = $upub_port;
print "(reconnect)\n";
}
else
{
# store the parameters we used, in case there's a reconnect
$upub_clientid = $clientid;
$upub_broker = $broker;
$upub_port = $port;
print "connect\n";
}
# construct mqtt message
###########
# connect #
###########
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(1*16);
# variable header: protocol name, protocol version
$variable = &UTF("MQIsdp").chr(3);
# cleverly, thanks to auto-vivification and guaranteed zeroing of new variables,
# we don't actually need to test if LWT has been set or not
# connect flags
$variable .= chr(2 | $upub_flags);
# keepalive (2 bytes)
$variable .= chr($upub_keepalive/256) . chr($upub_keepalive%256);
# payload: client ID
$payload = &UTF($clientid) . $upub_strings;
# add in the remaining length field and fix it together
$msg = $fixed . &remaining_length(length($variable)+length($payload)) . $variable . $payload;
if (socket(upub_S, PF_INET, SOCK_STREAM, getprotobyname('tcp')))
{
if (connect(upub_S, sockaddr_in($port,inet_aton($broker))))
{
select upub_S; $|=1;
select STDOUT;
# print returns true if successful
$rc = print upub_S $msg;
return $rc;
}
}
return 0;
}
################################################################################
# returns true if write if "successful" (i.e. if write to socket works)
sub upub_publish
{
my ($topic,$retain,$content) = @_;
my ($qos) = 0;
my ($fixed,$variable,$payload,$msg);
###########
# publish #
###########
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(3*16 + $qos*2 + ( (lc($retain) eq "y" || $retain==1) ?1:0));
# variable header
$variable = &UTF($topic);
# NOT POSSIBLE!!! This is defensive coding gone too far!
#if ($qos == 1)
#{
# $variable .= chr(0).chr(1);
#}
# payload
$payload = $content;
$msg = $fixed . &remaining_length(length($variable)+length($payload)) . $variable . $payload;
# send it
# print returns true if successful
# if unsuccessful, try to reconnect
$rc = print upub_S $msg;
if (!$rc)
{
# connect with empty parameters is "reconnect"
&upub_connect();
}
return $rc;
}
################################################################################
# returns true if sent successfully, but not sure how useful that is!
sub upub_disconnect
{
my ($fixed,$msg);
##############
# disconnect #
##############
if (1)
{
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(14*16);
$msg = $fixed . chr(0);
}
return (print upub_S $msg);
# sleep for a bit to make sure the socket doesn't close before the messages arrives
select undef,undef,undef,0.5;
close upub_S;
}
################################################################################
# if we're doing LWT, we should include PING for completeness
# returns true if it was sent successfully, but that's not really an indicator
# of connectedness - you really need to wait for the pong to come back
sub upub_ping
{
my ($fixed,$msg);
##############
# pingreq #
##############
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(12*16);
$msg = $fixed . chr(0);
return (print upub_S $msg);
# note we don't make any attempt to read the pong out of the socket
# so eventually, something horrible might happen (TCP fills up and the
# Internet stops, or similar).
}
################################################################################
sub UTF
{
# return the UTF-8 encoded version of the parameter
my ($string) = @_;
my ($length) = length($string);
return chr($length/256).chr($length%256).$string;
}
################################################################################
sub remaining_length
{
# return the remaining length field bytes for an integer input parameter
my ($x) = @_;
my ($rlf,$digit);
do
{
$digit = $x % 128;
$x = int($x/128);
# if there are more digits to encode, set the top bit of this digit
if ( $x > 0 )
{
$digit |= 0x80;
}
$rlf .= chr($digit);
} while ($x > 0);
return $rlf;
}
################################################################################
1

96
perl/read.me Normal file
View file

@ -0,0 +1,96 @@
CurrentCost publishing application
This application reads data from the serial port of a currentcost meter, and
publishes some of the data to a messaging system on the Internet so it can be
graphed and viewed. The messaging system is the IBM MQ Telemetry Transport
(http://mqtt.org).
Andy Stanford-Clark 2008
initial setup for Linux
-----------------------
change set_serial to refer to your serial port (most likely /dev/ttyUSB0)
change currentcost.pl
your name (e.g. andy)
identify the serial port (as you put in set_serial)
in broker.cfg
change the xxx on the first line to your initials (number of characters doesn't have to be 3!)
e.g. ASC_realtime
this is to uniquely identify your nanobroker to the broker on the internet
Download RSMB (Really Small Message Broker) from the IBM AlphaWorks site
http://alphaworks.ibm.com/tech/rsmb
Follow the instructions included with RSMB to get it up and running.
Note that you will have to chmod +x the appropriate broker executable for your platform,
so you can run it.
Note that broker.redhat also works on other Intel platform linux distributions
such as Debian and Ubuntu.
*** copy the broker config file, broker.cfg, into the RSMB directory ***
to start things running:
start RSMB...
./broker.redhat
(or whichever you are using)
or to run it in the background:
nohup ./broker.redhat >> /dev/null &
start the currentcost application
./start_cc
To see the published data, subscribe (e.g. using the java MQTT gui client (IA92 supportpac,
linked from mqtt.org)) to PowerMeter/CC/yourname (where "yourname" is the name you set in currentcost.pl) to see the power data and PowerMeter/temp/yourname to see the temperature.
Instructions for Windows
------------------------
You need perl on your machine - download free from http://activeperl.com
The perl support for serial ports is somewhat broken on Windows, so we use
a little C program, sread.exe to read the serial port, and then feed that
data into the perl application.
Modify win_currentcost.pl and broker.cfg as described above for linux, to put your own
name into the two files.
Download and install RSMB as described above, and in the RSMB documentation.
*** copy broker.cfg into the RSMB directory ***
Start RSMB in a command window with...
broker
In another command window start the currentcost bat file with...
win_currentost
Any problems or questions to andysc: andysc@uk.ibm.com or @andysc on twitter.

4
perl/set_serial Normal file
View file

@ -0,0 +1,4 @@
stty speed 9600 < /dev/ttyUSB0
stty -echo < /dev/ttyUSB0
stty raw < /dev/ttyUSB0

BIN
perl/sread.exe Normal file

Binary file not shown.

3
perl/start_cc Normal file
View file

@ -0,0 +1,3 @@
. ./set_serial
nohup perl currentcost.pl >> /dev/null &

6
perl/win_currentcost.bat Normal file
View file

@ -0,0 +1,6 @@
sread com2 -r 9600 | perl win_currentcost.pl

32
perl/win_currentcost.pl Normal file
View file

@ -0,0 +1,32 @@
# read the currentcost meter and publish the power in KW, to the nearest
# 100 watts, to an MQTT broker, along with temperature and history data.
# Andy S-C 16-Sep-07
########################################################################
$your_name = "yourname";
#$serial_port = "/dev/ttyUSB0";
$broker = "127.0.0.1";
#$broker = "realtime.ngi.ibm.com";
########################################################################
use CurrentCost;
while (1)
{
#open (SERIAL,"+<$serial_port") || die "can't open $serial_port";
$cc = new CurrentCost(broker=>$broker,name=>$your_name,debug=>1);
while (<>)
{
$xml = $_;
$cc->publish_all($xml);
}
}