[svk-commit] r2117 - in branches/mirror-pipeline: .

clkao at bestpractical.com clkao at bestpractical.com
Mon Nov 6 04:12:26 EST 2006


Author: clkao
Date: Mon Nov  6 04:12:26 2006
New Revision: 2117

Added:
   branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm
Modified:
   branches/mirror-pipeline/   (props changed)
   branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm

Log:
 r7222 at ubuntu:  clkao | 2006-11-06 02:50:38 +0000
 put generalised pipeline ra in seperate module to be used by svnra backend
 as well.
 


Added: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm
==============================================================================
--- (empty file)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm	Mon Nov  6 04:12:26 2006
@@ -0,0 +1,229 @@
+package SVK::Mirror::Backend::SVNRaPipe;
+use strict;
+
+use base 'Class::Accessor::Fast';
+__PACKAGE__->mk_accessors(qw(ra requests fh unsent_buf buf_call current_editors));
+
+use POSIX 'EPIPE';
+use Socket;
+
+
+=head1 NAME
+
+SVK::Mirror::Backend::SVNRaPipe - 
+
+=head1 SYNOPSIS
+
+ my @req = (['rev_proplist', 3'], ['replay', 3 0, 1, 'EDITOR'])
+ $generator = sub { shift @req };
+ $pra = SVK::Mirror::Backend::SVNRaPipe->new($ra, $generator);
+
+ $pra->rev_proplsit(3);
+ $pra->replay(3, 0, 1, SVK::Editor->new);
+
+=head1 DESCRIPTION
+
+
+
+=cut
+
+sub entry {
+    my ($self, $entry) = @_;
+    push @{$self->buf_call}, $entry;
+}
+
+sub try_flush {
+    my $self = shift;
+    my $wait = shift;
+    my $max_write = $wait ? -1 : 10;
+    if ($wait) {
+	$self->fh->blocking(1);
+    }
+    else {
+	$self->fh->blocking(0);
+	my $wstate = '';
+	vec($wstate,fileno($self->fh),1) = 1;
+	select(undef, $wstate, undef, 0);;
+	return unless vec($wstate,fileno($self->fh),1);
+
+    }
+    my $i = 0;
+    my $buf = $self->buf_call;
+    while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
+	if (my $len = length $self->unsent_buf) {
+	    if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
+		substr($self->{unsent_buf}, 0, $ret, '');
+		last if $ret != $len;
+	    }
+	    else {
+		die if $! == EPIPE;
+		return;
+	    }
+	}
+	last if $#{$buf} < 0;
+	my $msg = nfreeze($buf->[0]);
+	$msg = pack('N', length($msg)).$msg;
+
+	if (my $ret = syswrite($self->fh, $msg)) {
+	    $self->{unsent_buf} .= substr($msg, $ret)  if length($msg) != $ret;
+	    if ((shift @$buf)->[1] eq 'close_edit') {
+		--$self->{current_editors} ;
+	    }
+	}
+	else {
+	    die if $! == EPIPE;
+	    # XXX: check $! for fatal
+	    last;
+	}
+    }
+}
+
+
+sub new {
+    my ($class, $ra , $gen) = @_;
+
+    socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+	or  die "socketpair: $!";
+
+    my $self = $class->SUPER::new(
+        {
+            ra              => $ra,
+            requests        => $gen,
+            fh              => $c,
+            current_editors => 0,
+            buf_call        => [],
+            unsent_buf      => ''
+        }
+    );
+
+    if (my $pid = fork) {
+	close $p;
+	return $self;
+    }
+    else {
+	die "cannot fork: $!" unless defined $pid;
+	close $c;
+    }
+
+    $self->fh($p);
+    # Begin external process for buffered ra requests.
+    my $max_editor_in_buf = 5;
+    my $pool = SVN::Pool->new_default;
+    while (my $req = $gen->()) {
+	$pool->clear;
+	my ($cmd, @arg) = @$req;
+	require SVK::Editor::Serialize;
+	@arg = map { $_ eq 'EDITOR' ? SVK::Editor::Serialize->new({ cb_serialize_entry =>
+								    sub { $self->entry(@_); $self->try_flush } })
+			            : $_ } @arg;
+
+	while ($self->current_editors > $max_editor_in_buf) {
+	    $self->try_flush(1);
+	}
+
+	my $ret = $self->ra->$cmd(@arg);
+	if ($cmd eq 'replay') { # XXX or other requests using editors
+	    ++$self->{current_editors};
+	    $self->entry([undef, 'close_edit']);
+	}
+	else {
+	    $self->entry([$ret, $cmd]);
+	}
+	$self->try_flush();
+    }
+
+    while ($#{$self->buf_call} >= 0) {
+	$self->try_flush($p, 1) ;
+    }
+    exit;
+}
+
+# Client code reading pipelined responses
+
+use Storable qw(nfreeze thaw);
+
+sub read_msg {
+    my $self = shift;
+    my ($len, $msg);
+    read $self->fh, $len, 4 or die $!;
+    $len = unpack ('N', $len);
+    my $rlen = read $self->fh, $msg, $len or die $!;
+    return \$msg;
+}
+
+sub ensure_client_cmd {
+    my ($self, @arg) = @_;
+    # XXX: error message
+    my @exp = @{$self->requests->()};
+    for (@exp) {
+	my $arg = shift @arg;
+	if ($_ eq 'EDITOR') {
+	    die unless UNIVERSAL::isa($arg, 'SVK::Editor');
+	    return $arg;
+	}
+	die if ($_ cmp $arg);
+    }
+    die join(',', at arg) if @arg;
+}
+
+sub rev_proplist {
+    my $self = shift;
+    $self->ensure_client_cmd('rev_proplist', @_);
+    # read synchronous msg
+    my $data = thaw( ${$self->read_msg} );
+    # XXX: ensure $data->[1] is rev_proplist
+    return $data->[0];
+}
+
+
+sub replay {
+    my $self = shift;
+    my $editor = $self->ensure_client_cmd('replay', @_);
+    my $baton_map = {};
+    my $baton_pool = {};
+
+    while (my $data = $self->read_msg) {
+	my ($next, $func, @arg) = @{thaw($$data)};
+	my $baton_at = SVK::Editor->baton_at($func);
+	my $baton = $arg[$baton_at];
+	if ($baton_at >= 0) {
+	    $arg[$baton_at] = $baton_map->{$baton};
+	}
+
+	my $ret = $self->emit_editor_call($editor, $func, undef, @arg);
+
+	last if $func eq 'close_edit';
+
+	if ($func =~ m/^close/) {
+	    Carp::cluck $func unless $baton_map->{$baton};
+	    delete $baton_map->{$baton};
+	    delete $baton_pool->{$baton};
+	}
+
+	if ($next) {
+	    $baton_pool->{$next} = SVN::Pool->new_default;
+	    $baton_map->{$next} = $ret
+	}
+    }
+}
+
+sub emit_editor_call {
+    my ($self, $editor, $func, $pool, @arg) = @_;
+    my ($ret, $baton_at);
+    if ($func eq 'apply_textdelta') {
+	my $svndiff = pop @arg;
+	$ret = $editor->apply_textdelta(@arg, $pool);
+
+	if ($ret && $#$ret > 0) {
+	    my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
+	    print $stream $svndiff;
+	    close $stream;
+	}
+    }
+    else {
+	$ret = $editor->$func(@arg, $pool);
+    }
+    return $ret;
+}
+
+1;

Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm	(original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm	Mon Nov  6 04:12:26 2006
@@ -99,92 +99,29 @@
 
 =cut
 
-sub get_pipeline_editor_fh {
-    my ($self, $gen) = @_;
-    my $ra = $self->_new_ra;
-
-    return SVK::Mirror::Backend::SVNSync::Async->new($self, $ra, $gen);
-}
-
 sub mirror_changesets {
     my ( $self, $torev, $callback ) = @_;
     $self->mirror->with_lock( 'mirror',
         sub {
 	    $self->refresh;
-	    my $from = ($self->fromrev || 0)+1;
-	    my $ra = $self->_new_ra;
-	    $torev ||= $ra->get_latest_revnum;
-	    $self->_ra_finished($ra);
-	    my @revs = ($from..$torev);
-	    my $fh = $self->get_pipeline_editor_fh(sub { shift @revs });
-            $self->traverse_new_changesets(
-                sub { $self->evil_sync_changeset( @_, $fh, $callback ) }, $torev );
+	    my @revs;
+            $self->traverse_new_changesets( sub { push @revs, [@_] }, $torev );
+	    # prepare generator for pipelined ra
+	    my @gen;
+	    for ($revs[0][0]..$revs[-1][0]) {
+		push @gen, ['rev_proplist', $_], ['replay', $_, 0, 1, 'EDITOR'];
+	    }
+	    require SVK::Mirror::Backend::SVNRaPipe;
+	    my $pra = SVK::Mirror::Backend::SVNRaPipe->new($self->_new_ra, sub { shift @gen });
+	    for (@revs) {
+		$self->evil_sync_changeset( @$_, $pra, $callback );
+	    }
         }
     );
 }
 
-sub emit {
-    my ($self, $editor, $func, $pool, @arg) = @_;
-    my ($ret, $baton_at);
-    if ($func eq 'apply_textdelta') {
-	my $svndiff = pop @arg;
-	$ret = $editor->apply_textdelta(@arg, $pool);
-	if ($ret && $#$ret > 0) {
-	    my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
-	    print $stream $svndiff;
-	    close $stream;
-	}
-    }
-    else {
-	$ret = $editor->$func(@arg, $pool);
-    }
-    return $ret;
-}
-
-use Storable 'thaw';
-
-sub read_msg {
-    my ($sock) = @_;
-    my ($len, $msg);
-    read $sock, $len, 4 or die $!;
-    $len = unpack ('N', $len);
-    my $rlen = read $sock, $msg, $len or die $!;
-    return \$msg;
-}
-
-sub _read_evil_replay {
-    my ($self, $editor, $fh) = @_;
-    my $baton_map = {};
-    my $baton_pool = {};
-
-    while (my $data = read_msg($fh)) {
-	my $x = thaw($$data);
-	my ($next, $func, @arg) = @$x;
-	my $baton_at = SVK::Editor->baton_at($func);
-	my $baton = $arg[$baton_at];
-	if ($baton_at >= 0) {
-	    $arg[$baton_at] = $baton_map->{$baton};
-	}
-
-	my $ret = $self->emit($editor, $func, undef, @arg);
-
-	last if $func eq 'close_edit';
-
-	if ($func =~ m/^close/) {
-	    Carp::cluck $func unless $baton_map->{$baton};
-	    delete $baton_map->{$baton};
-	    delete $baton_pool->{$baton};
-	}
-
-	if ($next) {
-	    $baton_pool->{$next} = SVN::Pool->new_default;
-	    $baton_map->{$next} = $ret
-	}
-    }
-}
-
 sub evil_sync_changeset {
-    my ( $self, $changeset, $metadata, $fh, $callback ) = @_;
+    my ( $self, $changeset, $metadata, $ra, $callback ) = @_;
     my $t = $self->mirror->get_svkpath('/');
     my ( $editor, undef, %opt ) = $t->get_editor(
         ignore_mirror => 1,
@@ -198,7 +135,6 @@
         }
     );
 
-    my $ra = $self->_new_ra;
     my $pool = SVN::Pool->new_default;
     if ( my $revprop = $self->mirror->depot->mirror->revprop ) {
         my $prop = $ra->rev_proplist($changeset);
@@ -217,119 +153,11 @@
             return $t->as_url( 1, $path, $rev );
         }
     );
-
-    $self->_read_evil_replay($editor, $fh);
-    $self->_ra_finished($ra);
+    $ra->replay( $changeset, 0, 1, $editor );
     return;
 
 }
 
 sub _relayed { }
 
-package SVK::Mirror::Backend::SVNSync::Async;
-use SVK::Editor::Serialize;
-use Socket;
-
-my $max_editor_in_buf = 5;
-my $current_editors = 0;
-my $unsent_buf = '';
-
-my $buf;
-my $max;
-
-use IO::Handle;
-use Storable 'nfreeze';
-use POSIX 'EPIPE';
-
-sub on_close_edit {
-    --$current_editors;
-}
-
-sub try_flush {
-    my $fh = shift;
-    my $wait = shift;
-    my $max_write = $wait ? -1 : 10;
-    if ($wait) {
-	$fh->blocking(1);
-    }
-    else {
-	$fh->blocking(0);
-	my $wstate = '';
-	vec($wstate,fileno($fh),1) = 1;
-	select(undef, $wstate, undef, 0);;
-	return unless vec($wstate,fileno($fh),1);
-
-    }
-    my $i = 0;
-    while ( $#{$buf} >= 0 || length($unsent_buf) ) {
-	if (my $len = length $unsent_buf) {
-	    if (my $ret = syswrite($fh, $unsent_buf)) {
-		substr($unsent_buf, 0, $ret, '');
-		last if $ret != $len;
-	    }
-	    else {
-		die if $! == EPIPE;
-		return;
-	    }
-	}
-	last if $#{$buf} < 0;
-	use Carp;
-	Carp::cluck unless defined $buf->[0];
-	my $msg = nfreeze($buf->[0]);
-	$msg = pack('N', length($msg)).$msg;
-
-	if (my $ret = syswrite($fh, $msg)) {
-	    $unsent_buf .= substr($msg, $ret)  if length($msg) != $ret;
-	    on_close_edit() if (shift @$buf)->[1] eq 'close_edit';
-	}
-	else {
-	    die if $! == EPIPE;
-	    # XXX: check $! for fatal
-	    last;
-	}
-    }
-}
-
-sub entry {
-    my $entry = shift;
-    push @$buf, $entry;
-}
-
-sub new {
-    my ($self, $svnsync, $ra, $gen) = @_;
-    socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
-	or  die "socketpair: $!";
-
-    if (my $pid = fork) {
-	close $p;
-	return $c;
-    }
-    else {
-	die "cannot fork: $!" unless defined $pid;
-	close $c;
-    }
-
-    my $pool = SVN::Pool->new_default;
-    while (my $changeset = $gen->()) {
-	$pool->clear;
-	while ($current_editors > $max_editor_in_buf) {
-	    try_flush($p, 1);
-	}
-
-	++$current_editors;
-	$ra->replay($changeset, 0, 1,# SVK::Editor->new(_debug=>1));
-		    SVK::Editor::Serialize->new({ cb_serialize_entry =>
-						  sub { entry(@_); try_flush($p) } }));
-	entry([undef, 'close_edit']);
-	try_flush($p);
-    }
-
-    while ($#{$buf} >= 0) {
-	try_flush($p, 1) ;
-    }
-    exit;
-}
-
-
-
 1;


More information about the svk-commit mailing list