[Bps-public-commit] smokingit-worker branch, master, updated. e53b29924a0d86fca8ceae3ed2f1398ae92eb7d7

Alex Vandiver alexmv at bestpractical.com
Sun May 4 14:32:55 EDT 2014


The branch, master has been updated
       via  e53b29924a0d86fca8ceae3ed2f1398ae92eb7d7 (commit)
       via  43df96bf99aa70e77e917899f33e33d43b1af64c (commit)
       via  98bbebee807d7228d3cbebffb928b456b02ac7ad (commit)
       via  cee63fd20b5d8b39bfdbf107a6670efbbb65d1a2 (commit)
      from  1bf900537706f103d26464883b9e6900e7daabd3 (commit)

Summary of changes:
 bin/smokingit-worker                   |  15 +--
 lib/Smokingit/Worker.pm                |  92 ++++++++++-------
 lib/TAP/Harness/AnyEvent.pm            | 176 +++++++++++++++++++++++++++++++++
 lib/TAP/Parser/Multiplexer/AnyEvent.pm | 140 ++++++++++----------------
 4 files changed, 289 insertions(+), 134 deletions(-)
 create mode 100644 lib/TAP/Harness/AnyEvent.pm

- Log -----------------------------------------------------------------
commit cee63fd20b5d8b39bfdbf107a6670efbbb65d1a2
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Sun May 4 14:23:28 2014 -0400

    Switch to a complete event-based backend
    
    The previous implementation used techniques to still present an iterator
    interface; while the "select" had been replaced by I/O events, the
    iterator nature still required a blocking call.  This caused odd
    interactions with the event-based RPC calls that the worker used
    elsewhere, causing them to be delayed or sent out of order.
    
    By switching to a complete event-based interface, these problems are
    prevented, and the complexity of presenting as an iterator can be
    removed.  TAP::Harness already presents callbacks to its users, making
    adaption of an event-driven interface mostly transparent to its
    consumers.

diff --git a/lib/Smokingit/Worker.pm b/lib/Smokingit/Worker.pm
index 89f3607..6b0f907 100644
--- a/lib/Smokingit/Worker.pm
+++ b/lib/Smokingit/Worker.pm
@@ -10,6 +10,7 @@ use AnyEvent::Util;
 use AnyMQ;
 
 use TAP::Harness;
+use TAP::Harness::AnyEvent;
 use TAP::Parser::Multiplexer::AnyEvent;
 use Storable qw( nfreeze thaw );
 use YAML;
@@ -185,11 +186,10 @@ sub run_tests {
     # Progress indicator via Gearman
     my $done = 0;
     my @tests = glob($tests);
-    my $harness = TAP::Harness->new( {
+    my $harness = TAP::Harness::AnyEvent->new( {
             jobs       => $jobs,
             lib        => [".", "lib"],
             switches   => "-w",
-            multiplexer_class => 'TAP::Parser::Multiplexer::AnyEvent',
         } );
     $harness->diag_merge(1) if $harness->can("diag_merge");
 
@@ -216,14 +216,17 @@ sub run_tests {
                     smoke_result_id => $request->{smoke_id},
                     %{ $result->{test}{$filename} },
                 },
+                on_sent => sub {
+                    $self->publish(
+                        smoke_id => $request->{smoke_id},
+                        status   => "testing",
+                        complete => ++$done,
+                        total    => scalar(@tests),
+                    );
+                    return;
+                },
             );
-            $self->publish(
-                smoke_id => $request->{smoke_id},
-                status   => "testing",
-                complete => ++$done,
-                total    => scalar(@tests),
-            );
-            return 1;
+            return;
         }
     );
     $harness->callback(
@@ -235,35 +238,38 @@ sub run_tests {
         }
     );
 
-    my $aggregator = eval {
-        # Runtests apparently grows PERL5LIB -- local it so it doesn't
-        # grow without bound
-        local $ENV{PERL5LIB} = $ENV{PERL5LIB};
-        $harness->runtests(@tests);
-    } or return $error->("Testing bailed out!\n\n$@");
-    $result->{is_ok} = not $aggregator->has_problems;
-    $result->{elapsed} = $result->{end} - $result->{start};
-    $result->{$_} = $aggregator->$_ for
-        qw/failed
-           parse_errors
-           passed
-           planned
-           skipped
-           todo
-           todo_passed
-           total
-           wait
-           exit
-          /;
-
-    $self->call(
-        name => "post_results",
-        args => $result,
-    );
+    $harness->callback( after_runtests => sub {
+        my ($aggregator) = @_;
+        $result->{is_ok} = not $aggregator->has_problems;
+        $result->{elapsed} = $result->{end} - $result->{start};
+        $result->{$_} = $aggregator->$_ for
+            qw/failed
+               parse_errors
+               passed
+               planned
+               skipped
+               todo
+               todo_passed
+               total
+               wait
+               exit
+              /;
+
+        # Clean out
+        $cleanup->();
+
+        # And send the reply
+        $self->call(
+            name => "post_results",
+            args => $result,
+            on_reply => sub {
+                # Ensure the reply goes through before we ack everything
+                $args{on_success}->(1);
+            },
+        );
+    });
 
-    # Clean out
-    $cleanup->();
-    $args{on_success}->(1);
+    $harness->runtests(@tests);
 }
 
 1;
diff --git a/lib/TAP/Harness/AnyEvent.pm b/lib/TAP/Harness/AnyEvent.pm
new file mode 100644
index 0000000..b2b6409
--- /dev/null
+++ b/lib/TAP/Harness/AnyEvent.pm
@@ -0,0 +1,176 @@
+package TAP::Harness::AnyEvent;
+
+use strict;
+use vars qw($VERSION @ISA);
+
+use AnyEvent;
+use AnyEvent::Util qw//;
+
+ at ISA = 'TAP::Harness';
+
+=head1 NAME
+
+TAP::Harness::AnyEvent - AnyEvent-based TAP harness
+
+=head1 VERSION
+
+Version 1.0
+
+=cut
+
+$VERSION = 1.0;
+
+=head1 DESCRIPTION
+
+This provides a purely event-based alternative to L<TAP::Harness>, based
+on L<AnyEvent>.
+
+=head1 SYNOPSIS
+
+    use TAP::Harness::AnyEvent;
+    my $harness = TAP::Harness::AnyEvent->new( \%args );
+    $harness->callback(
+        after_runtests => sub { ... }
+    );
+
+    # This will return immediately:
+    $harness->runtests(@tests);
+
+This provides an alternative to L<TAP::Harness> which provides an
+entirely non-blocking harness to run and capture test output.  It
+leverages the existing L<TAP::Harness/callback> points to allow the
+standard L</runtests> to return immediately; interaction with individual
+results and aggregates is done via the C<after_test> and
+C<after_runtests> callback points.
+
+=head1 METHODS
+
+=head2 Class Methods
+
+=head3 C<new>
+
+    my %args = (
+       verbosity => 1,
+       lib       => [ 'lib', 'blib/lib', 'blib/arch' ],
+    );
+    my $harness = TAP::Harness::AnyEvent->new( \%args );
+
+The arguments are the same as for L<TAP::Harness/new>.  Under the hood,
+it uses the L<TAP::Parser::Multiplexer::AnyEvent> class as its
+L<TAP::Harness/multiplexer_class>.  Any C<multiplexer_class> which is
+explicitly provided should behave similarly.
+
+=cut
+
+sub _initialize {
+    my ($self, $args) = @_;
+    $args->{multiplexer_class} ||= 'TAP::Parser::Multiplexer::AnyEvent';
+
+    $self->SUPER::_initialize($args);
+}
+
+=head3 C<runtests>
+
+    my $cv = $harness->runtests(@tests);
+
+Unlike L<TAP::Harness>, this does not run tests and return a
+L<TAP::Parser::Aggregator>; instead, it immediately returns a
+L<condvar|AnyEvent/CONDITION VARIABLES> which will be called with a
+L<TAP::Parser::Aggregator> when the tests are complete.
+
+=cut
+
+sub runtests {
+    my ($self, @tests) = @_;
+
+    my $aggregate = $self->_construct( $self->aggregator_class );
+    $self->_make_callback( 'before_runtests', $aggregate );
+    $aggregate->start;
+
+    my $return = AnyEvent->condvar;
+    my $done = $self->aggregate_tests( $aggregate, @tests );
+    $done->cb(sub {
+        $aggregate->stop;
+        $self->summary( $aggregate );
+        $self->_make_callback( 'after_runtests', $aggregate );
+        $return->send( $aggregate );
+    });
+
+    return $return;
+}
+
+=head3 C<aggregate_tests>
+
+    my $cv = $harness->aggregate_tests( $aggregate, @tests );
+
+Runs tests in the given order, adding them to the
+L<TAP::Parser::Aggregator> given.  Returns a condvar which will be
+called with the aggregator when tests are complete.
+
+=cut
+
+sub aggregate_tests {
+    my ( $self, $aggregate, @tests ) = @_;
+
+    my $jobs      = $self->jobs;
+    my $scheduler = $self->make_scheduler(@tests);
+
+    local $ENV{HARNESS_IS_VERBOSE} = 1
+      if $self->formatter->verbosity > 0;
+    $self->formatter->prepare( map { $_->description } $scheduler->get_all );
+
+    # Keep multiplexer topped up
+    my $all_done = AnyEvent->condvar;
+    $all_done->begin( sub { shift->send( $aggregate ) } );
+
+    my $fill;
+    my $mux  = $self->_construct(
+        $self->multiplexer_class,
+        sub {
+            my ( $parser, $stash, $result ) = @_;
+            my ( $session, $job ) = @$stash;
+            if ( defined $result ) {
+                $session->result($result);
+                $self->_bailout($result) if $result->is_bailout;
+            }
+            else {
+                # End of parser. Automatically removed from the mux.
+                $self->finish_parser( $parser, $session );
+                $self->_after_test( $aggregate, $job, $parser );
+                $job->finish;
+                $all_done->end;
+
+                # And top the MUX back off again
+                $fill->();
+            }
+        }
+    );
+
+    $fill = sub {
+        while ( $mux->parsers < $jobs ) {
+            my $job = $scheduler->get_job;
+
+            # If we hit a spinner stop filling and start running.
+            return if !defined $job || $job->is_spinner;
+
+            $all_done->begin();
+            my ( $parser, $session ) = $self->make_parser($job);
+            $mux->add( $parser, [ $session, $job ] );
+        }
+    };
+
+    $fill->();
+
+    $all_done->end;
+    return $all_done;
+}
+
+=head1 See Also
+
+L<TAP::Harness::AnyEvent>
+
+L<TAP::Parser::Multiplexer>
+
+=cut
+
+1;
diff --git a/lib/TAP/Parser/Multiplexer/AnyEvent.pm b/lib/TAP/Parser/Multiplexer/AnyEvent.pm
index 01252af..c7978f0 100644
--- a/lib/TAP/Parser/Multiplexer/AnyEvent.pm
+++ b/lib/TAP/Parser/Multiplexer/AnyEvent.pm
@@ -25,26 +25,21 @@ $VERSION = '1.0';
 
     use TAP::Parser::Multiplexer::AnyEvent;
 
-    my $mux = TAP::Parser::Multiplexer->new;
+    my $mux = TAP::Parser::Multiplexer->new(
+        sub { ... }
+    );
     $mux->add( $parser1, $stash1 );
     $mux->add( $parser2, $stash2 );
-    while ( my ( $parser, $stash, $result ) = $mux->next ) {
-        # Will block waiting for input from the parsers, but still
-        # interact with other AnyEvent timers, etc
-    }
 
 =head1 DESCRIPTION
 
 L<TAP::Parser::Multiplexer> gathers input from multiple TAP::Parsers;
-this does so, but using AnyEvent as the main select loop.  A complete
-rewrite of L<TAP::Harness> to be event-driven is too complex, so this
-suffices to use the AnyEvent main loop for the main waiting-for-IO
-portion of testing.
+this does so, but using AnyEvent as the main select loop.  Results from
+the parsers will be passed to the subroutine.
 
-To use it, specify C<< multiplexer_class =>
-'TAP::Parser::Multiplexer::AnyEvent' >> to the L<TAP::Harness>
-constructor.  L<TAP::Harness/run_tests> will still block, but will be
-able to service AnyEvent events during its main loop.
+As it does not use the same C</next> interface as
+L<TAP::Parser::Multiplexer>, it is only usable by
+L<TAP::Harness::AnyEvent>.
 
 =head1 METHODS
 
@@ -52,9 +47,15 @@ able to service AnyEvent events during its main loop.
 
 =head3 C<new>
 
-    my $mux = TAP::Parser::Multiplexer::AnyEvent->new;
+    my $mux = TAP::Parser::Multiplexer::AnyEvent->new(
+        sub { ... }
+    );
 
-Returns a new C<TAP::Parser::Multiplexer::AnyEvent> object.
+Returns a new C<TAP::Parser::Multiplexer::AnyEvent> object.  The
+subroutine reference is a callback, which will be called for every
+result the multiplexer finds.  The subroutine will be called with three
+arguments -- the L<TAP::Parser> object, the stash from when the parser
+was added to the multiplexer, and the L<TAP::Parser::Result>.
 
 =cut
 
@@ -62,11 +63,7 @@ Returns a new C<TAP::Parser::Multiplexer::AnyEvent> object.
 
 sub _initialize {
     my $self = shift;
-    $self->{avid}    = [];                # Parsers that can't select
-    $self->{return}  = [];
-    $self->{handles} = [];
     $self->{count}   = 0;
-    $self->{ready}   = AnyEvent->condvar;
 
     # AnyEvent futzes with SIGCHLD.  We split the former _finish method
     # of TAP::Parser::Iterator::Process into two parts -- the exit-code
@@ -86,6 +83,8 @@ sub _initialize {
         }
     }
 
+    $self->{on_result} = shift;
+
     return $self;
 }
 
@@ -98,7 +97,7 @@ sub _initialize {
   $mux->add( $parser, $stash );
 
 Add a TAP::Parser to the multiplexer. C<$stash> is an optional opaque
-reference that will be returned from C<next> along with the parser and
+reference that will be passed to the callback, along with the parser and
 the next result.
 
 =cut
@@ -108,20 +107,34 @@ sub add {
 
     my @handles = $parser->get_select_handles;
     unless (@handles) {
-        push @{ $self->{avid} }, [ $parser, $stash ];
+        $self->{count}++;
+        # We don't want to parse it _now_, as we expect ->add() to be
+        # fast.  Rather, postpone it so we deal with it the next chance
+        # we hit the event loop.
+        AnyEvent::postpone {
+            while (1) {
+                my $result = $parser->next;
+                if ($result) {
+                    $self->{on_result}->( $parser, $stash, $result );
+                } else {
+                    $self->{count}--;
+                    $self->{on_result}->( $parser, $stash, undef );
+                    return;
+                }
+            }
+        };
         return;
     }
 
     my $it = $parser->_iterator;
     $it->{done} = AnyEvent->condvar;
     $it->{done}->begin( sub {
-        # Once _both_ exit code and reading-from-sockets is complete,
-        # push the undef that signals "this job is done" and kick the
-        # blocked ->recv in the iterator that reads the queue
+        # Once we have all of the exit code (below), parsing from
+        # sockets (below that), and closing of sockets (above), send the
+        # undef that signals this test is done.
         undef $it->{done};
-        push @{ $self->{return} }, [ $parser, $stash, undef ];
         $self->{count}--;
-        $self->{ready}->send;
+        $self->{on_result}->( $parser, $stash, undef );
     } );
 
     if ($parser->_iterator->{pid}) {
@@ -140,6 +153,7 @@ sub add {
     }
 
     for my $h (@handles) {
+        $it->{done}->begin;
         my $aeh; $aeh = AnyEvent->io(
             fh => $h,
             poll => "r",
@@ -147,16 +161,14 @@ sub add {
                 # If the filehandle has something to read, parse it
                 my $result = $parser->next;
                 if ($result) {
-                    # Not EOF?  Push onto the queue, and notify the
-                    # iterator that we just topped it off.
-                    push @{ $self->{return} },
-                        [ $parser, $stash, $result ];
-                    $self->{ready}->send;
+                    # Not EOF?  Return it.
+                    $self->{on_result}->( $parser, $stash, $result );
                 } else {
                     # If this is the end of the line, remove the
-                    # watcher.  We _don't_ push the "we're done" undef,
-                    # because we need the exit code first.
+                    # watcher.  Pushing the undef is done once all parts
+                    # of ->{done} are complete.
                     undef $aeh;
+                    $it->{done}->end;
                 }
             },
         );
@@ -175,75 +187,25 @@ when their input is exhausted.
 
 sub parsers {
     my $self = shift;
-    return $self->{count} + scalar @{ $self->{avid} };
+    return $self->{count};
 }
 
-sub _iter {
-    my $self = shift;
-
-    return sub {
-        # Drain all the non-selectable parsers first
-        if (@{ $self->{avid} } ) {
-            my ( $parser, $stash ) = @{ $self->{avid}->[0] };
-            my $result = $parser->next;
-            shift @{$self->{avid}} unless defined $result;
-            return ( $parser, $stash, $result );
-        }
-
-        # Block for the signal that we've got something to read
-        while (not @{ $self->{return} } and $self->{count} ) {
-            $self->{ready} = AnyEvent->condvar;
-            $self->{ready}->recv;
-        }
-
-        if (@{ $self->{return} }) {
-            my ($parser, $stash, $result) = @{ shift @{ $self->{return} } };
-            return ( $parser, $stash, $result );
-        }
-
-        return unless $self->{count};
-        die "No lines in the queue, but open handles?";
-    };
-}
-
-
 =head3 C<next>
 
-Return a result from the next available parser. Returns a list
-containing the parser from which the result came, the stash that
-corresponds with that parser and the result.
-
-    my ( $parser, $stash, $result ) = $mux->next;
-
-If C<$result> is undefined the corresponding parser has reached the end
-of its input (and will automatically be removed from the multiplexer).
-
-When all parsers are exhausted an empty list will be returned.
-
-    if ( my ( $parser, $stash, $result ) = $mux->next ) {
-        if ( ! defined $result ) {
-            # End of this parser
-        }
-        else {
-            # Process result
-        }
-    }
-    else {
-        # All parsers finished
-    }
+Exists to error if this classes is attempted to be used like a drop-in
+replacement for L<TAP::Parser::Multiplexer>.
 
 =cut
 
 sub next {
-    my $self = shift;
-    return ($self->{_iter} ||= $self->_iter)->();
+    die "TAP::Parser::Multiplexer::AnyEvent can only be used by TAP::Harness::AnyEvent";
 }
 
 =head1 See Also
 
-L<TAP::Parser>
+L<TAP::Harness::AnyEvent>
 
-L<TAP::Harness>
+L<TAP::Parser::Multiplexer>
 
 =cut
 

commit 98bbebee807d7228d3cbebffb928b456b02ac7ad
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Sun May 4 14:29:03 2014 -0400

    As we suggest both -p and -P, configure GetOpt to respect case

diff --git a/bin/smokingit-worker b/bin/smokingit-worker
index 87c0f81..2829f8e 100755
--- a/bin/smokingit-worker
+++ b/bin/smokingit-worker
@@ -4,7 +4,7 @@ use strict;
 use warnings;
 use lib 'lib';
 use Smokingit::Worker;
-use Getopt::Long;
+use Getopt::Long qw(:config no_ignore_case);
 
 my $jobs    = 5;
 my $repos   = "repos";

commit 43df96bf99aa70e77e917899f33e33d43b1af64c
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Sun May 4 14:29:21 2014 -0400

    For completenes, include a vhost option

diff --git a/bin/smokingit-worker b/bin/smokingit-worker
index 2829f8e..5006a0f 100755
--- a/bin/smokingit-worker
+++ b/bin/smokingit-worker
@@ -16,12 +16,13 @@ my $pass    = "guest";
 my $vhost   = "/";
 
 GetOptions(
-    "jobs|j=i",                   \$jobs,
-    "repo-path|repos|r=s",        \$repos,
-    "host|h=s",                   \$host,
-    "port|p=i",                   \$port,
-    "user|U=s",                   \$user,
-    "password|pass|P=s",          \$pass,
+    "jobs|j=i",            \$jobs,
+    "repo-path|repos|r=s", \$repos,
+    "host|h=s",            \$host,
+    "port|p=i",            \$port,
+    "user|U=s",            \$user,
+    "password|pass|P=s",   \$pass,
+    "vhost|v=s",           \$vhost,
 ) or die "Invalid options";
 
 die "Repository path $repos isn't writable!\n" unless -w $repos;

commit e53b29924a0d86fca8ceae3ed2f1398ae92eb7d7
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Sun May 4 14:30:40 2014 -0400

    If previous test run times are provided, use them to order tests
    
    Parallel tests should optimally be run from longest to shortest, to
    minimize leftover time at the tail end of the testsuite.  If
    previous-test-timing information is passed, use it to order tests:
      * Unknown tests go first, as they may be long
      * Otherwise, order by test length, longest to shortest
      * Finally, fall back to asciibetical

diff --git a/lib/Smokingit/Worker.pm b/lib/Smokingit/Worker.pm
index 6b0f907..3df116c 100644
--- a/lib/Smokingit/Worker.pm
+++ b/lib/Smokingit/Worker.pm
@@ -99,6 +99,7 @@ sub run_tests {
     my $env     = $request->{env} || '';
     my $jobs    = $request->{parallel} ? $self->max_jobs : 1;
     my $tests   = $request->{test_glob} || 't/*.t';
+    my $prev    = $request->{previous};
 
     my $result = { smoke_id => $request->{smoke_id} };
 
@@ -182,10 +183,19 @@ sub run_tests {
             if $ret;
     }
 
+    # Figure out test order
+    my @tests = glob($tests);
+    if ($prev) {
+        # Put unknown tests first (alphabetically), followed by the
+        # rest, slowest to fastest.
+        @tests = sort { (defined($prev->{$a}) <=> defined($prev->{$b}))
+                     or (    ($prev->{$b}||0) <=> ($prev->{$a}||0)    )
+                     or (                  $a cmp $b                  )
+                    } @tests;
+    }
 
     # Progress indicator via Gearman
     my $done = 0;
-    my @tests = glob($tests);
     my $harness = TAP::Harness::AnyEvent->new( {
             jobs       => $jobs,
             lib        => [".", "lib"],

-----------------------------------------------------------------------


More information about the Bps-public-commit mailing list