[Rt-commit] r6260 - in MessageBus: lib lib/MessageBus lib/MessageBus/Cache t

audreyt at bestpractical.com audreyt at bestpractical.com
Tue Oct 24 19:53:40 EDT 2006


Author: audreyt
Date: Tue Oct 24 19:53:38 2006
New Revision: 6260

Added:
   MessageBus/Makefile.PL
   MessageBus/lib/
   MessageBus/lib/MessageBus/
   MessageBus/lib/MessageBus.pm
   MessageBus/lib/MessageBus/Cache/
   MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
   MessageBus/lib/MessageBus/Cache/Memcached.pm
   MessageBus/lib/MessageBus/Cache/PlainHash.pm
   MessageBus/lib/MessageBus/Cacheable.pm
   MessageBus/lib/MessageBus/Pub.pm
   MessageBus/lib/MessageBus/Sub.pm
   MessageBus/t/
   MessageBus/t/basic.t

Log:
* MessageBus 0.01

Added: MessageBus/Makefile.PL
==============================================================================
--- (empty file)
+++ MessageBus/Makefile.PL	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,12 @@
+use strict;
+use inc::Module::Install;
+
+name        'MessageBus';
+license     'MIT';
+all_from    'lib/MessageBus.pm';
+
+requires    'DBM::Deep';
+requires    'Data::UUID';
+requires    'Class::InsideOut';
+
+sign; WriteAll;

Added: MessageBus/lib/MessageBus.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus.pm	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,113 @@
+package MessageBus;
+$MessageBus::VERSION = '0.01';
+
+use 5.005;
+use strict;
+use Class::InsideOut qw( public private register id );
+use MessageBus::Pub;
+use MessageBus::Sub;
+
+private cache  => my %cache;
+
+sub new {
+    my $id = id( my $self = register( bless \(my $s), shift ) );
+
+    my $backend = shift || 'PlainHash';
+
+    local $@;
+    eval { require "MessageBus/Cache/$_.pm" }
+        or die "Cannot find backend module: MessageBus::Cache::$_";
+
+    $cache{$id} = "MessageBus::Cache::$_"->new(@_);
+    return $self;
+}
+
+sub publish {
+    my $id   = id(my $self = shift);
+    MessageBus::Pub->new($cache{$id}, @_ ? @_ : '');
+}
+
+sub subscribe {
+    my $id   = id(my $self = shift);
+    MessageBus::Sub->new($cache{$id}, @_ ? @_ : '');
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+MessageBus - Lightweight publish/subscribe messaging system
+
+=head1 SYNOPSIS
+
+    # A new message bus with the DBM::Deep backend
+    # (Other possible backends include Memcached and PlainHash)
+    my $bus = MessageBus->new(DBM_Deep => '/tmp/bus.db');
+
+    # A channel is any arbitrary string
+    my $channel = '#perl6';
+
+    # Register a new publisher (you can publish to multiple channels)
+    my $pub = $bus->publish("#perl6", "#moose");
+
+    # Publish a message (may be a complex object) to those channels
+    $pub->msg("This is a message");
+
+    # Register a new subscriber (you can subscribe to multiple channels)
+    my $sub = $bus->subscribe("#moose");
+
+    # Publish an object to channels
+    $pub->msg("This is another message");
+
+    # Simple get: Returns the messages sent since the previous get,
+    # but only for the first channel.
+    my @msgs = $sub->get;
+
+    # Simple get, with an explicit channel key (must be among the ones
+    # it initially subscribed to)
+    my @msgs = $sub->get("#moose");
+
+    # Complex get: Returns a hash reference from channels to array
+    # references of [timestamp, message].
+    my $hash_ref = $sub->get_all;
+
+=head1 DESCRIPTION
+
+This module provides a simple message bus for publishing messages and
+subscribing to them.
+
+Currently it offers three backends: C<DBM_Deep> for on-disk storage,
+C<Memcached> for possibly multi-host storage, and C<PlainHash> for
+single-process storage.
+
+Please see the tests in F<t/> for this distribution, as well as L</SYNOPSIS>
+above, for some usage examples; detailed documentation is not yet available.
+
+=head1 AUTHORS
+
+Audrey Tang E<lt>cpan at audreyt.orgE<gt>
+
+=head1 COPYRIGHT (The "MIT" License)
+
+Copyright 2002-2006 by Audrey Tang <cpan at audreyt.org>.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is fur-
+nished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FIT-
+NESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE X
+CONSORTIUM BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+=cut

Added: MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/DBM_Deep.pm	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,60 @@
+package MessageBus::Cache::DBM_Deep;
+use strict;
+use base 'MessageBus::Cacheable';
+use DBM::Deep;
+use File::Temp qw/ tempfile /;
+
+sub new {
+    my $class = shift;
+    my $file  = shift;
+    my $mem = DBM::Deep->new($file || $class->default_config);
+    bless(\$mem, $class);
+}
+
+sub default_config {
+    my (undef, $filename) = tempfile(UNLINK => 1);
+    return $filename;
+}
+
+sub fetch {
+    my $self = shift;
+    map { $$self->get($_) } @_;
+}
+
+sub store {
+    my ($self, $key, $val, $time, $expiry) = @_;
+    $$self->put($key, [$time, $val]);
+}
+
+sub publisher_indices {
+    my ($self, $chan) = @_;
+    return { %{ $$self->get("$chan#") || {} } };
+}
+
+sub add_publisher {
+    my ($self, $chan, $pub) = @_;
+    my $pubs = $$self->get($chan) || {};
+    $pubs->{$pub} = 0;
+    $$self->put("$chan#", $pubs);
+}
+
+sub remove_publisher {
+    my ($self, $chan, $pub) = @_;
+    my $pubs = $$self->get($chan) || {};
+    delete $pubs->{$pub};
+    $$self->put("$chan#", $pubs);
+}
+
+sub get_index {
+    my ($self, $chan, $pub) = @_;
+    ($$self->get("$chan#") || {})->{$pub};
+}
+
+sub set_index {
+    my ($self, $chan, $pub, $idx) = @_;
+    my $pubs = $$self->get($chan) || {};
+    $pubs->{$pub} = $idx;
+    $$self->put("$chan#", $pubs);
+}
+
+1;

Added: MessageBus/lib/MessageBus/Cache/Memcached.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/Memcached.pm	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,61 @@
+package MessageBus::Cache::Memcached;
+use strict;
+use base 'MessageBus::Cacheable';
+use Cache::Memcached;
+
+sub new {
+    my $class = shift;
+    my $config = shift || $class->default_config;
+    my $mem = Cache::Memcached->new($config);
+    bless(\$mem, $class);
+}
+
+sub default_config {
+    return {
+        servers => ['127.0.0.1:11211'],
+        debug => 0
+    };
+}
+
+sub fetch {
+    my $self = shift;
+    values(%{$$self->get_multi(@_)});
+}
+
+sub store {
+    my ($self, $key, $val, $time, $expiry) = @_;
+    $$self->set($key, [$time, $val], $expiry);
+}
+
+sub publisher_indices {
+    my ($self, $chan) = @_;
+    $$self->get("$chan#") || {};
+}
+
+sub add_publisher {
+    my ($self, $chan, $pub) = @_;
+    my $pubs = $$self->get($chan) || {};
+    $pubs->{$pub} = 0;
+    $$self->set("$chan#", $pubs);
+}
+
+sub remove_publisher {
+    my ($self, $chan, $pub) = @_;
+    my $pubs = $$self->get($chan) || {};
+    delete $pubs->{$pub};
+    $$self->set("$chan#", $pubs);
+}
+
+sub get_index {
+    my ($self, $chan, $pub) = @_;
+    ($$self->get("$chan#") || {})->{$pub};
+}
+
+sub set_index {
+    my ($self, $chan, $pub, $idx) = @_;
+    my $pubs = $$self->get($chan) || {};
+    $pubs->{$pub} = $idx;
+    $$self->set("$chan#", $pubs);
+}
+
+1;

Added: MessageBus/lib/MessageBus/Cache/PlainHash.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/PlainHash.pm	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,44 @@
+package MessageBus::Cache::PlainHash;
+use strict;
+use base 'MessageBus::Cacheable';
+
+my %cache;
+
+use constant new => __PACKAGE__;
+
+sub fetch {
+    my $self = shift;
+    @cache{@_};
+}
+
+sub store {
+    my ($self, $key, $val, $time, $expiry) = @_;
+    $cache{$key} = [$time => $val];
+}
+
+sub publisher_indices {
+    my ($self, $chan) = @_;
+    +{%{$cache{$chan}||{}}};
+}
+
+sub add_publisher {
+    my ($self, $chan, $pub) = @_;
+    $cache{$chan}{$pub} = 0;
+}
+
+sub remove_publisher {
+    my ($self, $chan, $pub) = @_;
+    delete $cache{$chan}{$pub};
+}
+
+sub get_index {
+    my ($self, $chan, $pub) = @_;
+    $cache{$chan}{$pub};
+}
+
+sub set_index {
+    my ($self, $chan, $pub, $idx) = @_;
+    $cache{$chan}{$pub} = $idx;
+}
+
+1;

Added: MessageBus/lib/MessageBus/Cacheable.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cacheable.pm	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,38 @@
+package MessageBus::Cacheable;
+use strict;
+use Time::HiRes 'time';
+
+#method fetch                (Str *@keys --> List of Pair)                   { ... }
+#method store                (Str $key, Str $val, Num $time, Num $expiry)    { ... }
+
+#method add_publisher        (Str $chan, Str $pub)                           { ... }
+#method remove_publisher     (Str $chan, Str $pub)                           { ... }
+
+#method get_index            (Str $chan, Str $pub --> Int)                   { ... }
+#method set_index            (Str $chan, Str $pub, Int $index)               { ... }
+
+#method publisher_indices    (Str $chan --> Hash of Int)                     { ... }
+
+sub get {
+    my ($self, $chan, $orig, $curr) = @_;
+
+    no warnings 'uninitialized';
+    sort { $a->[0] <=> $b->[0] } $self->fetch(
+        map {
+            my $pub = $_;
+            my $index = $curr->{$pub};
+            map {
+                "$chan-$pub-$_"
+            } (($orig->{$pub}+1) .. $index);
+        } keys(%$curr)
+    );
+}
+
+sub put {
+    my ($self, $chan, $pub, $msg, $expiry) = @_;
+    my $index = 1 + $self->get_index($chan, $pub);
+    $self->store("$chan-$pub-$index", $msg, time, $expiry);
+    $self->set_index($chan, $pub, $index);
+}
+
+1;

Added: MessageBus/lib/MessageBus/Pub.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Pub.pm	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,32 @@
+package MessageBus::Pub;
+use strict;
+use Data::UUID;
+use Class::InsideOut qw( public private register id );
+
+public  chan    => my %chan;
+private uuid    => my %uuid;
+private cache   => my %cache;
+
+sub new {
+    my ($class, $cache, @chan) = @_;
+    my $id = id( my $self = register( bless \(my $s), shift ) );
+    $chan{$id} = \@chan;
+    $uuid{$id} = Data::UUID->new->create_b64;
+    $cache{$id} = $cache;
+    $cache->add_publisher($_, $uuid{$id}) for @chan;
+    return $self;
+}
+
+sub msg {
+    my $id  = id(my $self = shift);
+    my $msg = shift;
+    $cache{$id}->put($_, $uuid{$id}, $msg) for @{$chan{$id}};
+}
+
+no warnings 'redefine';
+sub DESTROY {
+    my $id  = id(my $self = shift);
+    $cache{$id}->remove_publisher($_, $uuid{$id}) for @{$chan{$id}};
+}
+
+1;

Added: MessageBus/lib/MessageBus/Sub.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Sub.pm	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,41 @@
+package MessageBus::Sub;
+
+use strict;
+use Class::InsideOut qw( public private register id );
+
+public  chan    => my %chan;
+private pubs    => my %pubs;
+private cache   => my %cache;
+
+sub new {
+    my ($class, $cache, @chan) = @_;
+    my $id = id( my $self = register( bless \(my $s), shift ) );
+    $pubs{$id}  = { map { $_ => $cache->publisher_indices($_); } @chan };
+    $cache{$id} = $cache;
+    $chan{$id}  = \@chan;
+    return $self;
+}
+
+sub get_all {
+    my $id = id(my $self = shift);
+    return {
+        map {
+            my $orig = $pubs{$id}{$_};
+            $pubs{$id}{$_} = $cache{$id}->publisher_indices($_);
+            $_ => [$cache{$id}->get($_, $orig, $pubs{$id}{$_})];
+        } @{$chan{$id}}
+    };
+}
+
+sub get {
+    my $id   = id(my $self = shift);
+    my $chan = @_ ? shift : $chan{$id}[0];
+
+    my $orig = $pubs{$id}{$chan};
+    $pubs{$id}{$chan} = $cache{$id}->publisher_indices($chan);
+    wantarray
+        ? map {$_->[1]} $cache{$id}->get($chan, $orig, $pubs{$id}{$chan})
+        : [map {$_->[1]} $cache{$id}->get($chan, $orig, $pubs{$id}{$chan})];
+}
+
+1;

Added: MessageBus/t/basic.t
==============================================================================
--- (empty file)
+++ MessageBus/t/basic.t	Tue Oct 24 19:53:38 2006
@@ -0,0 +1,35 @@
+use strict;
+use Test::More tests => 18;
+use MessageBus;
+use IO::Socket::INET;
+
+my @backends = qw(PlainHash DBM_Deep Memcached);
+
+SKIP: for (@backends) {
+    if ($_ eq 'Memcached') {
+        my $sock = IO::Socket::INET->new('127.0.0.1:11211')
+            or skip("Memcached not started", 6);
+    }
+
+    my $bus = MessageBus->new($_);
+
+    my @sub; $sub[0] = $bus->subscribe;
+
+    is_deeply([map {$_->[1]} @{$sub[0]->get_all->{''}}], [], 'get_all worked when there is no pubs');
+    is_deeply([$sub[0]->get], [], 'get_all worked when there is no pubs');
+
+    my $pub = $bus->publish;
+
+    $pub->msg('foo');
+
+    $sub[1] = $bus->subscribe;
+
+    $pub->msg('bar');
+    $pub->msg('baz');
+
+    is_deeply([$sub[0]->get], [qw< foo bar baz >], 'get worked');
+    is_deeply([$sub[0]->get], [], 'get emptied the cache');
+
+    is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [qw< bar baz >], 'get_all worked');
+    is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [], 'get_all emptied the cache');
+}


More information about the Rt-commit mailing list