[svk-commit] r2107 - in branches/mirror-pipeline: lib/SVK/Editor lib/SVK/Mirror/Backend

clkao at bestpractical.com clkao at bestpractical.com
Sat Nov 4 16:25:42 EST 2006


Author: clkao
Date: Sat Nov  4 16:25:39 2006
New Revision: 2107

Added:
   branches/mirror-pipeline/lib/SVK/Editor/Serialize.pm
Removed:
   branches/mirror-pipeline/utils/replay-pipeline.pl
Modified:
   branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm

Log:
Move pipeline code into ::SVNSync so we don't exec

Added: branches/mirror-pipeline/lib/SVK/Editor/Serialize.pm
==============================================================================
--- (empty file)
+++ branches/mirror-pipeline/lib/SVK/Editor/Serialize.pm	Sat Nov  4 16:25:39 2006
@@ -0,0 +1,51 @@
+package SVK::Editor::Serialize;
+use base 'SVK::Editor';
+
+__PACKAGE__->mk_accessors(qw(cb_serialize_entry));
+
+sub AUTOLOAD {
+    my ($self, @arg) = @_;
+    my $func = our $AUTOLOAD;
+    $func =~ s/^.*:://;
+    return if $func =~ m/^[A-Z]+$/;
+    my $baton;
+    pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+
+    warn "==> starting " if $func eq 'open_root';
+
+    if ((my $baton_at = $self->baton_at ($func)) >= 0) {
+	$baton = $arg[$baton_at];
+    }
+    else {
+	$baton = 0;
+    }
+
+    my $ret = $func =~ m/^(?:add|open)/ ? ++$self->{batons} : undef;
+    Carp::cluck unless defined $func;
+    $self->cb_serialize_entry->([$ret, $func, @arg]);
+    return $ret;
+}
+
+my $apply_textdelta_entry;
+
+sub close_file {
+    my ($self, $baton, $checksum) = @_;
+    if ($apply_textdelta_entry) {
+	$self->cb_serialize_entry->($apply_textdelta_entry);
+	$apply_textdelta_entry = undef;
+    }
+    $self->cb_serialize_entry->([undef, 'close_file', $baton, $checksum]);
+}
+
+sub apply_textdelta {
+    my ($self, $baton, @arg) = @_;
+    pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+    my $entry = [undef, 'apply_textdelta', $baton, @arg, ''];
+    open my ($svndiff), '>', \$entry->[-1];
+#    $self->cb_serialize_entry->($entry);
+    $apply_textdelta_entry = $entry;
+    return [SVN::TxDelta::to_svndiff($svndiff)];
+}
+
+
+1;

Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm	(original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm	Sat Nov  4 16:25:39 2006
@@ -98,9 +98,15 @@
 
 =cut
 
+sub get_pipeline_editor_fh {
+    my ($self, $gen) = @_;
+    my $ra = $self->_new_ra;
+
+    return SVK::Mirror::Backend::SVNSync::Async->new($self, $ra, $gen);
+}
+
 sub mirror_changesets {
     my ( $self, $torev, $callback ) = @_;
-
     $self->mirror->with_lock( 'mirror',
         sub {
 	    $self->refresh;
@@ -108,11 +114,8 @@
 	    my $ra = $self->_new_ra;
 	    $torev ||= $ra->get_latest_revnum;
 	    $self->_ra_finished($ra);
-	    warn $self->mirror->depot->repospath;
-	    warn "$from $torev";
-	    open my $fh, '-|', "perl -Ilib utils/replay-pipeline.pl ".$self->mirror->depot->repospath." / $from $torev 2>stderr " or die $!;
-	    require IO::Handle;
-#	    $fh->blocking(0);
+	    my @revs = ($from..$torev);
+	    my $fh = $self->get_pipeline_editor_fh(sub { shift @revs });
             $self->traverse_new_changesets(
                 sub { $self->evil_sync_changeset( @_, $fh, $callback ) }, $torev );
         }
@@ -124,11 +127,9 @@
     my ($self, $editor, $func, $pool, @arg) = @_;
     my ($ret, $baton_at);
     if ($func eq 'apply_textdelta') {
-#	$pool->default;
 	my $svndiff = pop @arg;
 	$ret = $editor->apply_textdelta(@arg, $pool);
 	if ($ret && $#$ret > 0) {
-#	    warn md5_hex($svndiff);
 	    my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
 	    print $stream $svndiff;
 	    close $stream;
@@ -159,7 +160,6 @@
     my $pool;
     while (my $data = read_msg($fh)) {
 	my $x = thaw($$data);
-#	Carp::cluck (length $$data ) unless defined $x;
 	my ($next, $func, @arg) = @$x;
 	my $baton_at = SVK::Editor->baton_at($func);
 	my $baton = $arg[$baton_at];
@@ -181,7 +181,6 @@
 	    $baton_pool->{$next} = SVN::Pool->new_default;
 	    $baton_map->{$next} = $ret
 	}
-#	warn "==> close edit" if $func eq 'close_edit';
     }
 }
 
@@ -220,15 +219,121 @@
         }
     );
 
-#    warn "==> begin evil $changeset";
     $self->_read_evil_replay($editor, $fh);
     $self->_ra_finished($ra);
-#die;
     return;
 
 }
 
-
 sub _relayed { }
 
+package SVK::Mirror::Backend::SVNSync::Async;
+use SVK::Editor::Serialize;
+use Socket;
+
+my $max_editor_in_buf = 5;
+my $current_editors = 0;
+my $unsent_buf = '';
+
+my $buf;
+my $max;
+
+use IO::Handle;
+use Storable 'nfreeze';
+use POSIX 'EPIPE';
+
+sub on_close_edit {
+    --$current_editors;
+}
+
+sub try_flush {
+    my $fh = shift;
+    my $wait = shift;
+    my $max_write = $wait ? -1 : 10;
+    if ($wait) {
+	$fh->blocking(1);
+    }
+    else {
+	$fh->blocking(0);
+	my $wstate = '';
+	vec($wstate,fileno($fh),1) = 1;
+	select(undef, $wstate, undef, 0);;
+	return unless vec($wstate,fileno($fh),1);
+
+    }
+    my $i = 0;
+    while ( 
+	    $#{$buf} >= 0 || length($unsent_buf) ) {
+	if (my $len = length $unsent_buf) {
+	    if (my $ret = syswrite($fh, $unsent_buf)) {
+		substr($unsent_buf, 0, $ret, '');
+		last if $ret != $len;
+	    }
+	    else {
+		die if $! == EPIPE;
+		return;
+	    }
+	}
+	last if $#{$buf} < 0;
+	use Carp;
+	Carp::cluck unless defined $buf->[0];
+	my $msg = nfreeze($buf->[0]);
+	$msg = pack('N', length($msg)).$msg;
+
+	if (my $ret = syswrite($fh, $msg)) {
+	    $unsent_buf .= substr($msg, $ret)  if length($msg) != $ret;
+	    on_close_edit() if (shift @$buf)->[1] eq 'close_edit';
+	}
+	else {
+	    die if $! == EPIPE;
+	    # XXX: check $! for fatal
+	    last;
+	}
+    }
+}
+
+sub entry {
+    my $entry = shift;
+    push @$buf, $entry;
+}
+
+sub new {
+    my ($self, $svnsync, $ra, $gen) = @_;
+    socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+	or  die "socketpair: $!";
+
+    if (my $pid = fork) {
+	close $p;
+	return $c;
+    }
+    else {
+	die "cannot fork: $!" unless defined $pid;
+	close $c;
+    }
+
+    my $pool = SVN::Pool->new_default;
+    while (my $changeset = $gen->()) {
+	$pool->clear;
+	while ($current_editors > $max_editor_in_buf) {
+	    warn "waiting for flush for $changeset.. ($current_editors).";
+	    try_flush($p, 1);
+	}
+
+	++$current_editors;
+	warn "replay $changeset ($current_editors)";
+	$ra->replay($changeset, 0, 1,# SVK::Editor->new(_debug=>1));
+		    SVK::Editor::Serialize->new({ cb_serialize_entry =>
+						  sub { entry(@_); try_flush($p) } }));
+	entry([undef, 'close_edit']);
+	try_flush($p);
+    }
+
+    while ($#{$buf} >= 0) {
+	try_flush($p, 1) ;
+    }
+    exit;
+}
+
+
+
 1;


More information about the svk-commit mailing list