[Bps-public-commit] smokingit-worker branch, master, updated. c07480c254a1c01d9e3677c142f8599530444952

Alex Vandiver alexmv at bestpractical.com
Tue Mar 4 00:11:46 EST 2014


The branch, master has been updated
       via  c07480c254a1c01d9e3677c142f8599530444952 (commit)
      from  e9fa5345bbe35efe1c94547642082507b935c533 (commit)

Summary of changes:
 lib/Smokingit/Worker.pm                |  25 +---
 lib/TAP/Parser/Multiplexer/AnyEvent.pm | 250 +++++++++++++++++++++++++++++++++
 2 files changed, 255 insertions(+), 20 deletions(-)
 create mode 100644 lib/TAP/Parser/Multiplexer/AnyEvent.pm

- Log -----------------------------------------------------------------
commit c07480c254a1c01d9e3677c142f8599530444952
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Tue Mar 4 00:11:20 2014 -0500

    Switch from Coro to a real AnyEvent loop
    
    This fixes bogus -1 waitpid() returns, as well as blead-through of exit
    codes from one failed test to others running at the same time.

diff --git a/lib/Smokingit/Worker.pm b/lib/Smokingit/Worker.pm
index 6dbccc6..4966083 100644
--- a/lib/Smokingit/Worker.pm
+++ b/lib/Smokingit/Worker.pm
@@ -4,9 +4,9 @@ use warnings;
 package Smokingit::Worker;
 use base 'AnyEvent::RabbitMQ::RPC';
 
+use EV;
+use AnyEvent;
 use AnyMQ;
-use Coro;
-use Coro::AnyEvent;
 
 use TAP::Harness;
 use Storable qw( nfreeze thaw );
@@ -47,13 +47,6 @@ sub publish {
     my (%msg) = @_;
     $msg{type} = "worker_progress";
     $self->{pubsub}->topic($msg{type})->publish(\%msg);
-    Coro::AnyEvent::poll;
-}
-
-sub call {
-    my $self = shift;
-    $self->SUPER::call(@_);
-    Coro::AnyEvent::poll;
 }
 
 sub repo_path {
@@ -75,10 +68,10 @@ sub run {
         name => "run_tests",
         run  => sub {
             my %args = @_;
-            async { $self->run_tests( %args ) };
+            $self->run_tests( %args );
         },
     );
-    AE::cv->recv;
+    EV::loop;
 }
 
 my %projects;
@@ -139,7 +132,6 @@ sub run_tests {
         );
         $cleanup->();
         $args{on_failure}->( $result->{error} );
-        Coro::AnyEvent::poll;
     };
 
     # Check the SHA and check it out
@@ -185,6 +177,7 @@ sub run_tests {
             jobs       => $jobs,
             lib        => [".", "lib"],
             switches   => "-w",
+            multiplexer_class => 'TAP::Parser::Multiplexer::AnyEvent',
         } );
     $harness->diag_merge(1) if $harness->can("diag_merge");
 
@@ -229,14 +222,6 @@ sub run_tests {
             open($args->{spool}, ">", \$result->{test}{$filename}{raw_tap});
         }
     );
-    $harness->callback(
-        made_parser => sub {
-            my $parser = shift;
-            $parser->callback(
-                ALL => sub { Coro::AnyEvent::poll; }
-            );
-        }
-    );
 
     my $aggregator = eval {
         # Runtests apparently grows PERL5LIB -- local it so it doesn't
diff --git a/lib/TAP/Parser/Multiplexer/AnyEvent.pm b/lib/TAP/Parser/Multiplexer/AnyEvent.pm
new file mode 100644
index 0000000..01252af
--- /dev/null
+++ b/lib/TAP/Parser/Multiplexer/AnyEvent.pm
@@ -0,0 +1,250 @@
+package TAP::Parser::Multiplexer::AnyEvent;
+
+use strict;
+use vars qw($VERSION @ISA);
+
+use TAP::Object ();
+use AnyEvent;
+use AnyEvent::Util qw//;
+
+ at ISA = 'TAP::Object';
+
+=head1 NAME
+
+TAP::Parser::Multiplexer::AnyEvent - AnyEvent-based multiplexer for TAP
+
+=head1 VERSION
+
+Version 1.0
+
+=cut
+
+$VERSION = '1.0';
+
+=head1 SYNOPSIS
+
+    use TAP::Parser::Multiplexer::AnyEvent;
+
+    my $mux = TAP::Parser::Multiplexer->new;
+    $mux->add( $parser1, $stash1 );
+    $mux->add( $parser2, $stash2 );
+    while ( my ( $parser, $stash, $result ) = $mux->next ) {
+        # Will block waiting for input from the parsers, but still
+        # interact with other AnyEvent timers, etc
+    }
+
+=head1 DESCRIPTION
+
+L<TAP::Parser::Multiplexer> gathers input from multiple TAP::Parsers;
+this does so, but using AnyEvent as the main select loop.  A complete
+rewrite of L<TAP::Harness> to be event-driven is too complex, so this
+suffices to use the AnyEvent main loop for the main waiting-for-IO
+portion of testing.
+
+To use it, specify C<< multiplexer_class =>
+'TAP::Parser::Multiplexer::AnyEvent' >> to the L<TAP::Harness>
+constructor.  L<TAP::Harness/run_tests> will still block, but will be
+able to service AnyEvent events during its main loop.
+
+=head1 METHODS
+
+=head2 Class Methods
+
+=head3 C<new>
+
+    my $mux = TAP::Parser::Multiplexer::AnyEvent->new;
+
+Returns a new C<TAP::Parser::Multiplexer::AnyEvent> object.
+
+=cut
+
+# new() implementation supplied by TAP::Object
+
+sub _initialize {
+    my $self = shift;
+    $self->{avid}    = [];                # Parsers that can't select
+    $self->{return}  = [];
+    $self->{handles} = [];
+    $self->{count}   = 0;
+    $self->{ready}   = AnyEvent->condvar;
+
+    # AnyEvent futzes with SIGCHLD.  We split the former _finish method
+    # of TAP::Parser::Iterator::Process into two parts -- the exit-code
+    # part, and the closing-sockets part.  The latter lies in _finish,
+    # the former is installed per-child using L<AnyEvent/child>.
+    {
+        no warnings 'redefine';
+        require TAP::Parser::Iterator::Process;
+        *TAP::Parser::Iterator::Process::_finish = sub {
+            my $self = shift;
+            $self->{_next} = sub {return};
+            ( delete $self->{out} )->close;
+            ( delete $self->{err} )->close if $self->{sel};
+            delete $self->{sel} if $self->{sel};
+            $self->{teardown}->() if $self->{teardown};
+            $self->{done}->end;
+        }
+    }
+
+    return $self;
+}
+
+##############################################################################
+
+=head2 Instance Methods
+
+=head3 C<add>
+
+  $mux->add( $parser, $stash );
+
+Add a TAP::Parser to the multiplexer. C<$stash> is an optional opaque
+reference that will be returned from C<next> along with the parser and
+the next result.
+
+=cut
+
+sub add {
+    my ( $self, $parser, $stash ) = @_;
+
+    my @handles = $parser->get_select_handles;
+    unless (@handles) {
+        push @{ $self->{avid} }, [ $parser, $stash ];
+        return;
+    }
+
+    my $it = $parser->_iterator;
+    $it->{done} = AnyEvent->condvar;
+    $it->{done}->begin( sub {
+        # Once _both_ exit code and reading-from-sockets is complete,
+        # push the undef that signals "this job is done" and kick the
+        # blocked ->recv in the iterator that reads the queue
+        undef $it->{done};
+        push @{ $self->{return} }, [ $parser, $stash, undef ];
+        $self->{count}--;
+        $self->{ready}->send;
+    } );
+
+    if ($parser->_iterator->{pid}) {
+        # Add a SIGCHLD watcher that gets the exit code.
+        $it->{done}->begin;
+        my $watch; $watch = AnyEvent->child(
+            pid => $it->{pid},
+            cb  => sub {
+                my ($pid, $status) = @_;
+                undef $watch;
+                $it->{wait} = $status;
+                $it->{exit} = $it->_wait2exit($status);
+                $it->{done}->end;
+            },
+        );
+    }
+
+    for my $h (@handles) {
+        my $aeh; $aeh = AnyEvent->io(
+            fh => $h,
+            poll => "r",
+            cb => sub {
+                # If the filehandle has something to read, parse it
+                my $result = $parser->next;
+                if ($result) {
+                    # Not EOF?  Push onto the queue, and notify the
+                    # iterator that we just topped it off.
+                    push @{ $self->{return} },
+                        [ $parser, $stash, $result ];
+                    $self->{ready}->send;
+                } else {
+                    # If this is the end of the line, remove the
+                    # watcher.  We _don't_ push the "we're done" undef,
+                    # because we need the exit code first.
+                    undef $aeh;
+                }
+            },
+        );
+    }
+    $self->{count}++;
+}
+
+=head3 C<parsers>
+
+  my $count   = $mux->parsers;
+
+Returns the number of parsers. Parsers are removed from the multiplexer
+when their input is exhausted.
+
+=cut
+
+sub parsers {
+    my $self = shift;
+    return $self->{count} + scalar @{ $self->{avid} };
+}
+
+sub _iter {
+    my $self = shift;
+
+    return sub {
+        # Drain all the non-selectable parsers first
+        if (@{ $self->{avid} } ) {
+            my ( $parser, $stash ) = @{ $self->{avid}->[0] };
+            my $result = $parser->next;
+            shift @{$self->{avid}} unless defined $result;
+            return ( $parser, $stash, $result );
+        }
+
+        # Block for the signal that we've got something to read
+        while (not @{ $self->{return} } and $self->{count} ) {
+            $self->{ready} = AnyEvent->condvar;
+            $self->{ready}->recv;
+        }
+
+        if (@{ $self->{return} }) {
+            my ($parser, $stash, $result) = @{ shift @{ $self->{return} } };
+            return ( $parser, $stash, $result );
+        }
+
+        return unless $self->{count};
+        die "No lines in the queue, but open handles?";
+    };
+}
+
+
+=head3 C<next>
+
+Return a result from the next available parser. Returns a list
+containing the parser from which the result came, the stash that
+corresponds with that parser and the result.
+
+    my ( $parser, $stash, $result ) = $mux->next;
+
+If C<$result> is undefined the corresponding parser has reached the end
+of its input (and will automatically be removed from the multiplexer).
+
+When all parsers are exhausted an empty list will be returned.
+
+    if ( my ( $parser, $stash, $result ) = $mux->next ) {
+        if ( ! defined $result ) {
+            # End of this parser
+        }
+        else {
+            # Process result
+        }
+    }
+    else {
+        # All parsers finished
+    }
+
+=cut
+
+sub next {
+    my $self = shift;
+    return ($self->{_iter} ||= $self->_iter)->();
+}
+
+=head1 See Also
+
+L<TAP::Parser>
+
+L<TAP::Harness>
+
+=cut
+
+1;

-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list