currentcost/perl/nupub.pl
Dale Lane 9302406fbc
2008-11-26 21:40:59 +00:00

376 lines
8.6 KiB
Raku

# new micro pub (nupub.pl)
# Andy S-C 16-Jun-05
# new version 5-May-06
# publishes a series of mqtt messages to a broker in the most unobtrusive,
# yet still readable, way I can think of.
# only does qos 0
# can do LWT
# if clientid is null (""), it makes a random one for you
################################################################################
# {optional}
# upub_lwt ($keepalive,$topic,$qos,$retain,$content);
# upub_connect ($clientid,$broker,$port)
# upub_publish ($topic,$retain,$content)
# {optional, though desirable if you set LWT}
# upub_disconnect
# semantic oddities
# upub_publish will attempt to reconnect if it detects a problem with the connection
# upub_connect with empty parameter list is "reconnect" to where you connected earlier
# NB do not just call upub_connect with NO parameters (i.e. "upub_connect;" ) ...
# that does something horrible and you get the stack contents of the current function
# passed into connect, which is NOT what you want, believe me!
# so it must be called like this: "upub_connect()"... though as it's already in publish
# you almost certainly won't need to anyway!
# global variables:
# $upub_keepalive, $upub_flags, and $upub_strings,
# $upub_clientid, $upub_broker, $upub_port
################################################################################
# test:
# nice simple example
if (0)
{
if (&upub_connect ("asc","10.0.0.14",1883))
{
for ($i=0;$i<10;$i++)
{
&upub_publish ("test","n","hello Andy [$i] $$");
sleep 2;
}
}
&upub_disconnect;
}
# more complicated example, with LWT and forcing a reconnect
if (0)
{
&upub_lwt (5,"lwt",0,0,"nupub died");
if (&upub_connect ("asc","10.0.0.14",1883))
{
for ($i=0;$i<10;$i++)
{
print "sending message $i\n";
if (!&upub_publish ("test","n","hello Andy [$i] $$"))
{
print "message $i may not have got through\n";
# it's tempting to republish here, but we're not really allowed to
# as qos0 is AT MOST once, and there's a CHANCE the first one got through!
}
if ($i == 4)
{
print "going to sleep....";
sleep 10;
print "awake\n";
}
sleep 2;
}
sleep 5;
# if you comment this line out, it should fire LWT
#&upub_ping;
sleep 5;
&upub_disconnect;
}
else
{
print "couldn't connect\n";
}
}
################################################################################
use Socket;
# disable the signal handler for sig pipe
$SIG{'PIPE'}='IGNORE';
# $upub_keepalive, $upub_flags, and $upub_strings are global
sub upub_lwt
{
my ($keepalive,$topic,$qos,$retain,$content) = @_;
# note that although WE can't do qos>0, the broker, can, so we can set qos on LWT
$upub_keepalive = $keepalive;
# LWT flags: x, x, ret, qos qos, will, (cleanstart, 0)
$upub_flags = ( (lc($retain) eq "y" || $retain==1) ?1:0)*32 + $qos * 8 + 4;
# now we can just bitwise OR this into the connect flags byte in connect
# will topic, will content, both UTF encoded
$upub_strings = &UTF($topic) . &UTF($content);
# append this to the end of the payload in connect
# if these variables are set, we'll use them, if not, their default values
# will do the right things for us (so we don't need a flag to say "LWT set")
}
################################################################################
# returns true if connected, false if not
sub upub_connect
{
my ($clientid,$broker,$port) = @_;
my ($fixed,$variable,$payload,$msg,$rc);
# if only the client id is null
if (!$clientid && $broker && $port)
{
# make a random client id:
# upub_{last 3 digits of time}_{random 1-99}_{process id last 3 digits}
$clientid = "upub_".(time %1000)."_".int(rand(100))."_".($$ % 1000);
}
# if there are NO parameters
if ($#_ == -1)
{
# it's a reconnect, use the parameters we used last time
$clientid = $upub_clientid;
$broker = $upub_broker;
$port = $upub_port;
print "(reconnect)\n";
}
else
{
# store the parameters we used, in case there's a reconnect
$upub_clientid = $clientid;
$upub_broker = $broker;
$upub_port = $port;
print "connect\n";
}
# construct mqtt message
###########
# connect #
###########
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(1*16);
# variable header: protocol name, protocol version
$variable = &UTF("MQIsdp").chr(3);
# cleverly, thanks to auto-vivification and guaranteed zeroing of new variables,
# we don't actually need to test if LWT has been set or not
# connect flags
$variable .= chr(2 | $upub_flags);
# keepalive (2 bytes)
$variable .= chr($upub_keepalive/256) . chr($upub_keepalive%256);
# payload: client ID
$payload = &UTF($clientid) . $upub_strings;
# add in the remaining length field and fix it together
$msg = $fixed . &remaining_length(length($variable)+length($payload)) . $variable . $payload;
if (socket(upub_S, PF_INET, SOCK_STREAM, getprotobyname('tcp')))
{
if (connect(upub_S, sockaddr_in($port,inet_aton($broker))))
{
select upub_S; $|=1;
select STDOUT;
# print returns true if successful
$rc = print upub_S $msg;
return $rc;
}
}
return 0;
}
################################################################################
# returns true if write if "successful" (i.e. if write to socket works)
sub upub_publish
{
my ($topic,$retain,$content) = @_;
my ($qos) = 0;
my ($fixed,$variable,$payload,$msg);
###########
# publish #
###########
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(3*16 + $qos*2 + ( (lc($retain) eq "y" || $retain==1) ?1:0));
# variable header
$variable = &UTF($topic);
# NOT POSSIBLE!!! This is defensive coding gone too far!
#if ($qos == 1)
#{
# $variable .= chr(0).chr(1);
#}
# payload
$payload = $content;
$msg = $fixed . &remaining_length(length($variable)+length($payload)) . $variable . $payload;
# send it
# print returns true if successful
# if unsuccessful, try to reconnect
$rc = print upub_S $msg;
if (!$rc)
{
# connect with empty parameters is "reconnect"
&upub_connect();
}
return $rc;
}
################################################################################
# returns true if sent successfully, but not sure how useful that is!
sub upub_disconnect
{
my ($fixed,$msg);
##############
# disconnect #
##############
if (1)
{
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(14*16);
$msg = $fixed . chr(0);
}
return (print upub_S $msg);
# sleep for a bit to make sure the socket doesn't close before the messages arrives
select undef,undef,undef,0.5;
close upub_S;
}
################################################################################
# if we're doing LWT, we should include PING for completeness
# returns true if it was sent successfully, but that's not really an indicator
# of connectedness - you really need to wait for the pong to come back
sub upub_ping
{
my ($fixed,$msg);
##############
# pingreq #
##############
# fixed header: msg type (4) dup (1) qos (2) retain (1)
$fixed = chr(12*16);
$msg = $fixed . chr(0);
return (print upub_S $msg);
# note we don't make any attempt to read the pong out of the socket
# so eventually, something horrible might happen (TCP fills up and the
# Internet stops, or similar).
}
################################################################################
sub UTF
{
# return the UTF-8 encoded version of the parameter
my ($string) = @_;
my ($length) = length($string);
return chr($length/256).chr($length%256).$string;
}
################################################################################
sub remaining_length
{
# return the remaining length field bytes for an integer input parameter
my ($x) = @_;
my ($rlf,$digit);
do
{
$digit = $x % 128;
$x = int($x/128);
# if there are more digits to encode, set the top bit of this digit
if ( $x > 0 )
{
$digit |= 0x80;
}
$rlf .= chr($digit);
} while ($x > 0);
return $rlf;
}
################################################################################
1