[Bps-public-commit] smokingit-worker branch, master, updated. f3f9989167f23000de4d03cd6df054eacc3dacfa
Alex Vandiver
alexmv at bestpractical.com
Sun Nov 4 23:48:12 EST 2012
The branch, master has been updated
via f3f9989167f23000de4d03cd6df054eacc3dacfa (commit)
via 4f044a7cec1fb1570f8fc6b537addc265b1f3d55 (commit)
via 1ce05d1e2373ca1bb669894991dee9e29ac08834 (commit)
from 6dd7de3b31567d3710f07879c094312f16c2dc3f (commit)
Summary of changes:
bin/smokingit-worker | 31 ++++++++++----------
lib/Smokingit/Worker.pm | 77 +++++++++++++++++++++++++++++++++++--------------
2 files changed, 71 insertions(+), 37 deletions(-)
- Log -----------------------------------------------------------------
commit 1ce05d1e2373ca1bb669894991dee9e29ac08834
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Wed Jun 20 01:35:56 2012 -0400
Move to using AnyEvent::RabbitMQ::RPC instead of Gearman
Using AMQP as a backend allows for reliable message delivery, as well as
providing a general-use message bus for other uses.
diff --git a/bin/smokingit-worker b/bin/smokingit-worker
index fe13beb..87c0f81 100755
--- a/bin/smokingit-worker
+++ b/bin/smokingit-worker
@@ -5,34 +5,35 @@ use warnings;
use lib 'lib';
use Smokingit::Worker;
use Getopt::Long;
-use IO::Socket::INET;
my $jobs = 5;
my $repos = "repos";
-my $servers = ['127.0.0.1:4730'];
+
+my $host = "localhost";
+my $port = 5672;
+my $user = "guest";
+my $pass = "guest";
+my $vhost = "/";
GetOptions(
"jobs|j=i", \$jobs,
"repo-path|repos|r=s", \$repos,
- "gearman-server|server|s=s@", \$servers,
+ "host|h=s", \$host,
+ "port|p=i", \$port,
+ "user|U=s", \$user,
+ "password|pass|P=s", \$pass,
) or die "Invalid options";
die "Repository path $repos isn't writable!\n" unless -w $repos;
-for my $s (@{$servers}) {
- my ($host, $port) = split ':', $s, 2;
- $port ||= 4730;
-
- my $socket = IO::Socket::INET->new( PeerHost => "$host:$port" );
- die "Connect to $s failed: $1\n"
- if ($@ || "") =~ /^IO::Socket::INET: (.*)/;
-
- $s = $socket->peerhost . ":" . $port;
-}
-
my $worker = Smokingit::Worker->new(
max_jobs => $jobs,
repo_path => $repos,
- job_servers => $servers,
+
+ host => $host,
+ port => $port,
+ user => $user,
+ pass => $pass,
+ vhost => $vhost,
);
$worker->run;
diff --git a/lib/Smokingit/Worker.pm b/lib/Smokingit/Worker.pm
index e28d9a2..ebecbe2 100644
--- a/lib/Smokingit/Worker.pm
+++ b/lib/Smokingit/Worker.pm
@@ -2,11 +2,9 @@ use strict;
use warnings;
package Smokingit::Worker;
-use base 'Gearman::Worker';
+use base 'AnyEvent::RabbitMQ::RPC';
use TAP::Harness;
-
-use Gearman::Client;
use Storable qw( nfreeze thaw );
use YAML;
@@ -14,15 +12,16 @@ use Smokingit::Worker::Clean::TmpFiles;
use Smokingit::Worker::Clean::Postgres;
use Smokingit::Worker::Clean::Mysql;
-use fields qw(max_jobs repo_path client);
-
sub new {
my $class = shift;
my %args = (
max_jobs => 5,
+ serialize => 'Storable',
@_,
);
- my $self = $class->SUPER::new(%args);
+ my $self = $class->SUPER::new(
+ %args,
+ );
$self->{max_jobs} = $args{max_jobs};
$self->{repo_path} = $args{repo_path};
die "No valid repository path set!"
@@ -43,27 +42,21 @@ sub max_jobs {
$self->{max_jobs} = shift || 1;
}
-sub client {
- my $self = shift;
- return $self->{client};
-}
-
sub run {
my $self = shift;
chdir($self->repo_path);
- $self->register_function( run_tests => sub {$self->run_tests(@_)} );
- $self->{client} = Gearman::Client->new(
- job_servers => $self->job_servers,
+ $self->register(
+ name => "run_tests",
+ run => sub {$self->run_tests(@_)},
);
- $self->work while 1;
+ AE::cv->recv;
}
my %projects;
sub run_tests {
my $self = shift;
- my $job = shift;
- my $request = @_ ? shift : thaw( $job->arg );
+ my $request = shift;
my %ORIGINAL_ENV = %ENV;
# Read data out of the hash they passed in
@@ -103,7 +96,10 @@ sub run_tests {
my $error = sub {
$result->{error} = shift;
warn $result->{error} . "\n";
- $self->client->do_task(post_results => nfreeze($result));
+ $self->call(
+ name => "post_results",
+ args => $result
+ );
$cleanup->();
};
@@ -148,10 +144,9 @@ sub run_tests {
} );
$harness->callback(
after_test => sub {
- $job->set_status(++$done,scalar(@tests));
+ ++$done; # No-op for now
}
);
-
my $aggregator = eval {
# Runtests apparently grows PERL5LIB -- local it so it doesn't
# grow without bound
@@ -166,8 +161,9 @@ sub run_tests {
for keys %{$aggregator->{parser_for}};
$result->{aggregator} = $aggregator;
- $self->client->dispatch_background(
- post_results => nfreeze($result)
+ $self->call(
+ name => "post_results",
+ args => $result,
);
# Clean out
commit 4f044a7cec1fb1570f8fc6b537addc265b1f3d55
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Wed Jun 20 01:39:33 2012 -0400
Publish smoke progress on the `worker_progress` topic
Use the message bus to publish updates on the progress of the smoke test
diff --git a/lib/Smokingit/Worker.pm b/lib/Smokingit/Worker.pm
index ebecbe2..8e98e8e 100644
--- a/lib/Smokingit/Worker.pm
+++ b/lib/Smokingit/Worker.pm
@@ -4,6 +4,8 @@ use warnings;
package Smokingit::Worker;
use base 'AnyEvent::RabbitMQ::RPC';
+use AnyMQ;
+
use TAP::Harness;
use Storable qw( nfreeze thaw );
use YAML;
@@ -19,9 +21,16 @@ sub new {
serialize => 'Storable',
@_,
);
+ my $pubsub = AnyMQ->new_with_traits(
+ exchange => 'events',
+ %args,
+ traits => ['AMQP'],
+ );
my $self = $class->SUPER::new(
+ connection => $pubsub->_rf,
%args,
);
+ $self->{pubsub} = $pubsub;
$self->{max_jobs} = $args{max_jobs};
$self->{repo_path} = $args{repo_path};
die "No valid repository path set!"
@@ -30,6 +39,13 @@ sub new {
return $self;
}
+sub publish {
+ my $self = shift;
+ my (%msg) = @_;
+ $msg{type} = "worker_progress";
+ $self->{pubsub}->topic($msg{type})->publish(\%msg);
+}
+
sub repo_path {
my $self = shift;
return $self->{repo_path} unless @_;
@@ -59,6 +75,11 @@ sub run_tests {
my $request = shift;
my %ORIGINAL_ENV = %ENV;
+ $self->publish(
+ smoke_id => $request->{smoke_id},
+ status => "started",
+ );
+
# Read data out of the hash they passed in
my $project = $request->{project};
my $url = $request->{repository_url};
@@ -125,6 +146,10 @@ sub run_tests {
# Run configure
if ($config =~ /\S/) {
+ $self->publish(
+ smoke_id => $request->{smoke_id},
+ status => "configuring",
+ );
$config =~ s/\s*;?\s*\n+/ && /g;
my $output = `($config) 2>&1`;
my $ret = $?;
@@ -142,9 +167,21 @@ sub run_tests {
lib => [".", "lib"],
switches => "-w",
} );
+
+ $self->publish(
+ smoke_id => $request->{smoke_id},
+ status => "testing",
+ complete => $done,
+ total => scalar(@tests),
+ );
$harness->callback(
after_test => sub {
- ++$done; # No-op for now
+ $self->publish(
+ smoke_id => $request->{smoke_id},
+ status => "testing",
+ complete => ++$done,
+ total => scalar(@tests),
+ );
}
);
my $aggregator = eval {
commit f3f9989167f23000de4d03cd6df054eacc3dacfa
Merge: 6dd7de3 4f044a7
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Tue Sep 4 02:34:21 2012 -0400
Merge branch 'pubsub'
-----------------------------------------------------------------------
More information about the Bps-public-commit
mailing list