[Bps-public-commit] smokingit-worker branch, master, updated. f3f9989167f23000de4d03cd6df054eacc3dacfa

Alex Vandiver alexmv at bestpractical.com
Sun Nov 4 23:48:12 EST 2012


The branch, master has been updated
       via  f3f9989167f23000de4d03cd6df054eacc3dacfa (commit)
       via  4f044a7cec1fb1570f8fc6b537addc265b1f3d55 (commit)
       via  1ce05d1e2373ca1bb669894991dee9e29ac08834 (commit)
      from  6dd7de3b31567d3710f07879c094312f16c2dc3f (commit)

Summary of changes:
 bin/smokingit-worker    | 31 ++++++++++----------
 lib/Smokingit/Worker.pm | 77 +++++++++++++++++++++++++++++++++++--------------
 2 files changed, 71 insertions(+), 37 deletions(-)

- Log -----------------------------------------------------------------
commit 1ce05d1e2373ca1bb669894991dee9e29ac08834
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Wed Jun 20 01:35:56 2012 -0400

    Move to using AnyEvent::RabbitMQ::RPC instead of Gearman
    
    Using AMQP as a backend allows for reliable message delivery, as well as
    providing a general-use message bus for other uses.

diff --git a/bin/smokingit-worker b/bin/smokingit-worker
index fe13beb..87c0f81 100755
--- a/bin/smokingit-worker
+++ b/bin/smokingit-worker
@@ -5,34 +5,35 @@ use warnings;
 use lib 'lib';
 use Smokingit::Worker;
 use Getopt::Long;
-use IO::Socket::INET;
 
 my $jobs    = 5;
 my $repos   = "repos";
-my $servers = ['127.0.0.1:4730'];
+
+my $host    = "localhost";
+my $port    = 5672;
+my $user    = "guest";
+my $pass    = "guest";
+my $vhost   = "/";
 
 GetOptions(
     "jobs|j=i",                   \$jobs,
     "repo-path|repos|r=s",        \$repos,
-    "gearman-server|server|s=s@", \$servers,
+    "host|h=s",                   \$host,
+    "port|p=i",                   \$port,
+    "user|U=s",                   \$user,
+    "password|pass|P=s",          \$pass,
 ) or die "Invalid options";
 
 die "Repository path $repos isn't writable!\n" unless -w $repos;
 
-for my $s (@{$servers}) {
-    my ($host, $port) = split ':', $s, 2;
-    $port ||= 4730;
-
-    my $socket = IO::Socket::INET->new( PeerHost => "$host:$port" );
-    die "Connect to $s failed: $1\n"
-        if ($@ || "") =~ /^IO::Socket::INET: (.*)/;
-
-    $s = $socket->peerhost . ":" . $port;
-}
-
 my $worker = Smokingit::Worker->new(
     max_jobs    => $jobs,
     repo_path   => $repos,
-    job_servers => $servers,
+
+    host        => $host,
+    port        => $port,
+    user        => $user,
+    pass        => $pass,
+    vhost       => $vhost,
 );
 $worker->run;
diff --git a/lib/Smokingit/Worker.pm b/lib/Smokingit/Worker.pm
index e28d9a2..ebecbe2 100644
--- a/lib/Smokingit/Worker.pm
+++ b/lib/Smokingit/Worker.pm
@@ -2,11 +2,9 @@ use strict;
 use warnings;
 
 package Smokingit::Worker;
-use base 'Gearman::Worker';
+use base 'AnyEvent::RabbitMQ::RPC';
 
 use TAP::Harness;
-
-use Gearman::Client;
 use Storable qw( nfreeze thaw );
 use YAML;
 
@@ -14,15 +12,16 @@ use Smokingit::Worker::Clean::TmpFiles;
 use Smokingit::Worker::Clean::Postgres;
 use Smokingit::Worker::Clean::Mysql;
 
-use fields qw(max_jobs repo_path client);
-
 sub new {
     my $class = shift;
     my %args = (
         max_jobs => 5,
+        serialize => 'Storable',
         @_,
     );
-    my $self = $class->SUPER::new(%args);
+    my $self = $class->SUPER::new(
+        %args,
+    );
     $self->{max_jobs} = $args{max_jobs};
     $self->{repo_path} = $args{repo_path};
     die "No valid repository path set!"
@@ -43,27 +42,21 @@ sub max_jobs {
     $self->{max_jobs} = shift || 1;
 }
 
-sub client {
-    my $self = shift;
-    return $self->{client};
-}
-
 sub run {
     my $self = shift;
     chdir($self->repo_path);
-    $self->register_function( run_tests => sub {$self->run_tests(@_)} );
-    $self->{client} = Gearman::Client->new(
-        job_servers => $self->job_servers,
+    $self->register(
+        name => "run_tests",
+        run  => sub {$self->run_tests(@_)},
     );
-    $self->work while 1;
+    AE::cv->recv;
 }
 
 my %projects;
 
 sub run_tests {
     my $self = shift;
-    my $job = shift;
-    my $request = @_ ? shift : thaw( $job->arg );
+    my $request = shift;
     my %ORIGINAL_ENV = %ENV;
 
     # Read data out of the hash they passed in
@@ -103,7 +96,10 @@ sub run_tests {
     my $error = sub {
         $result->{error} = shift;
         warn $result->{error} . "\n";
-        $self->client->do_task(post_results => nfreeze($result));
+        $self->call(
+            name => "post_results",
+            args => $result
+        );
         $cleanup->();
     };
 
@@ -148,10 +144,9 @@ sub run_tests {
         } );
     $harness->callback(
         after_test => sub {
-            $job->set_status(++$done,scalar(@tests));
+            ++$done; # No-op for now
         }
     );
-
     my $aggregator = eval {
         # Runtests apparently grows PERL5LIB -- local it so it doesn't
         # grow without bound
@@ -166,8 +161,9 @@ sub run_tests {
         for keys %{$aggregator->{parser_for}};
     $result->{aggregator} = $aggregator;
 
-    $self->client->dispatch_background(
-        post_results => nfreeze($result)
+    $self->call(
+        name => "post_results",
+        args => $result,
     );
 
     # Clean out

commit 4f044a7cec1fb1570f8fc6b537addc265b1f3d55
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Wed Jun 20 01:39:33 2012 -0400

    Publish smoke progress on the `worker_progress` topic
    
    Use the message bus to publish updates on the progress of the smoke test

diff --git a/lib/Smokingit/Worker.pm b/lib/Smokingit/Worker.pm
index ebecbe2..8e98e8e 100644
--- a/lib/Smokingit/Worker.pm
+++ b/lib/Smokingit/Worker.pm
@@ -4,6 +4,8 @@ use warnings;
 package Smokingit::Worker;
 use base 'AnyEvent::RabbitMQ::RPC';
 
+use AnyMQ;
+
 use TAP::Harness;
 use Storable qw( nfreeze thaw );
 use YAML;
@@ -19,9 +21,16 @@ sub new {
         serialize => 'Storable',
         @_,
     );
+    my $pubsub = AnyMQ->new_with_traits(
+        exchange => 'events',
+        %args,
+        traits => ['AMQP'],
+    );
     my $self = $class->SUPER::new(
+        connection => $pubsub->_rf,
         %args,
     );
+    $self->{pubsub} = $pubsub;
     $self->{max_jobs} = $args{max_jobs};
     $self->{repo_path} = $args{repo_path};
     die "No valid repository path set!"
@@ -30,6 +39,13 @@ sub new {
     return $self;
 }
 
+sub publish {
+    my $self = shift;
+    my (%msg) = @_;
+    $msg{type} = "worker_progress";
+    $self->{pubsub}->topic($msg{type})->publish(\%msg);
+}
+
 sub repo_path {
     my $self = shift;
     return $self->{repo_path} unless @_;
@@ -59,6 +75,11 @@ sub run_tests {
     my $request = shift;
     my %ORIGINAL_ENV = %ENV;
 
+    $self->publish(
+        smoke_id => $request->{smoke_id},
+        status   => "started",
+    );
+
     # Read data out of the hash they passed in
     my $project = $request->{project};
     my $url     = $request->{repository_url};
@@ -125,6 +146,10 @@ sub run_tests {
 
     # Run configure
     if ($config =~ /\S/) {
+        $self->publish(
+            smoke_id => $request->{smoke_id},
+            status   => "configuring",
+        );
         $config =~ s/\s*;?\s*\n+/ && /g;
         my $output = `($config) 2>&1`;
         my $ret = $?;
@@ -142,9 +167,21 @@ sub run_tests {
             lib => [".", "lib"],
             switches => "-w",
         } );
+
+    $self->publish(
+        smoke_id => $request->{smoke_id},
+        status   => "testing",
+        complete => $done,
+        total    => scalar(@tests),
+    );
     $harness->callback(
         after_test => sub {
-            ++$done; # No-op for now
+            $self->publish(
+                smoke_id => $request->{smoke_id},
+                status   => "testing",
+                complete => ++$done,
+                total    => scalar(@tests),
+            );
         }
     );
     my $aggregator = eval {

commit f3f9989167f23000de4d03cd6df054eacc3dacfa
Merge: 6dd7de3 4f044a7
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Tue Sep 4 02:34:21 2012 -0400

    Merge branch 'pubsub'


-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list