[svk-commit] r2131 - in trunk: lib/SVK/Editor lib/SVK/Mirror/Backend lib/SVK/Path

clkao at bestpractical.com clkao at bestpractical.com
Fri Nov 10 11:11:53 EST 2006


Author: clkao
Date: Fri Nov 10 11:11:52 2006
New Revision: 2131

Added:
   trunk/lib/SVK/Editor/Serialize.pm
   trunk/lib/SVK/Mirror/Backend/SVNRaPipe.pm
Modified:
   trunk/   (props changed)
   trunk/MANIFEST
   trunk/lib/SVK/Mirror/Backend/SVNRa.pm
   trunk/lib/SVK/Mirror/Backend/SVNSync.pm
   trunk/lib/SVK/Path/View.pm

Log:
Merge the mirror pipeline branch to trunk.

This makes svk uses a separate process to buffer ra_replay calls,
so we are not bounded by network and local-fs-write latency.  There
has been reports about 2x speedup for svk sync.


Modified: trunk/MANIFEST
==============================================================================
--- trunk/MANIFEST	(original)
+++ trunk/MANIFEST	Fri Nov 10 11:11:52 2006
@@ -78,6 +78,7 @@
 lib/SVK/Editor/Merge.pm
 lib/SVK/Editor/Patch.pm
 lib/SVK/Editor/Rename.pm
+lib/SVK/Editor/Serialize.pm
 lib/SVK/Editor/Sign.pm
 lib/SVK/Editor/Status.pm
 lib/SVK/Editor/SubTree.pm
@@ -117,6 +118,7 @@
 lib/SVK/MimeDetect/Internal.pm
 lib/SVK/Mirror.pm
 lib/SVK/Mirror/Backend/SVNRa.pm
+lib/SVK/Mirror/Backend/SVNRaPipe.pm
 lib/SVK/Mirror/Backend/SVNSync.pm
 lib/SVK/MirrorCatalog.pm
 lib/SVK/Notify.pm

Added: trunk/lib/SVK/Editor/Serialize.pm
==============================================================================
--- (empty file)
+++ trunk/lib/SVK/Editor/Serialize.pm	Fri Nov 10 11:11:52 2006
@@ -0,0 +1,49 @@
+package SVK::Editor::Serialize;
+use base 'SVK::Editor';
+
+__PACKAGE__->mk_accessors(qw(cb_serialize_entry));
+
+sub AUTOLOAD {
+    my ($self, @arg) = @_;
+    my $func = our $AUTOLOAD;
+    $func =~ s/^.*:://;
+    return if $func =~ m/^[A-Z]+$/;
+    my $baton;
+    pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+
+    if ((my $baton_at = $self->baton_at ($func)) >= 0) {
+	$baton = $arg[$baton_at];
+    }
+    else {
+	$baton = 0;
+    }
+
+    my $ret = $func =~ m/^(?:add|open)/ ? ++$self->{batons} : undef;
+    Carp::cluck unless defined $func;
+    $self->cb_serialize_entry->([$ret, $func, @arg]);
+    return $ret;
+}
+
+my $apply_textdelta_entry;
+
+sub close_file {
+    my ($self, $baton, $checksum) = @_;
+    if ($apply_textdelta_entry) {
+	$self->cb_serialize_entry->($apply_textdelta_entry);
+	$apply_textdelta_entry = undef;
+    }
+    $self->cb_serialize_entry->([undef, 'close_file', $baton, $checksum]);
+}
+
+sub apply_textdelta {
+    my ($self, $baton, @arg) = @_;
+    pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+    my $entry = [undef, 'apply_textdelta', $baton, @arg, ''];
+    open my ($svndiff), '>', \$entry->[-1];
+#    $self->cb_serialize_entry->($entry);
+    $apply_textdelta_entry = $entry;
+    return [SVN::TxDelta::to_svndiff($svndiff)];
+}
+
+
+1;

Modified: trunk/lib/SVK/Mirror/Backend/SVNRa.pm
==============================================================================
--- trunk/lib/SVK/Mirror/Backend/SVNRa.pm	(original)
+++ trunk/lib/SVK/Mirror/Backend/SVNRa.pm	Fri Nov 10 11:11:52 2006
@@ -6,6 +6,8 @@
 use SVN::Ra;
 use SVK::I18N;
 use SVK::Editor;
+use SVK::Mirror::Backend::SVNRaPipe;
+
 use Class::Autouse qw(SVK::Editor::SubTree SVK::Editor::CopyHandler);
 
 ## class SVK::Mirror::Backend::SVNRa;
@@ -19,7 +21,7 @@
 
 # for this: things without _'s will probably move to base
 # SVK::Mirror::Backend
-__PACKAGE__->mk_accessors(qw(mirror _config _auth_baton _auth_ref _auth_baton source_root source_path fromrev _has_replay _cached_ra));
+__PACKAGE__->mk_accessors(qw(mirror _config _auth_baton _auth_ref _auth_baton source_root source_path fromrev _has_replay _cached_ra use_pipeline));
 
 =head1 NAME
 
@@ -51,7 +53,7 @@
 
 sub load {
     my ($class, $mirror) = @_;
-    my $self = $class->SUPER::new( { mirror => $mirror } );
+    my $self = $class->SUPER::new( { mirror => $mirror, use_pipeline => 1 } );
     my $t = $mirror->get_svkpath;
     die loc( "%1 is not a mirrored path.\n", $t->depotpath )
         unless $t->root->check_path( $mirror->path );
@@ -82,7 +84,7 @@
 sub create {
     my ($class, $mirror, $backend, $args, $txn, $editor) = @_;
 
-    my $self = $class->SUPER::new({ mirror => $mirror });
+    my $self = $class->SUPER::new({ mirror => $mirror, use_pipeline => 1 });
 
     my $ra = $self->_new_ra;
 
@@ -253,6 +255,7 @@
 sub _ra_finished {
     my ($self, $ra) = @_;
     return if $self->_cached_ra;
+    return if ref($ra) eq 'SVK::Mirror::Backend::SVNRaPipe';
     $self->_cached_ra( $ra );
 }
 
@@ -326,8 +329,12 @@
     die $@ if $@;
 }
 
+=item sync_changeset($changeset, $metadata, $ra, $extra_prop, $callback )
+
+=cut
+
 sub sync_changeset {
-    my ($self, $changeset, $metadata, $callback) = @_;
+    my ( $self, $changeset, $metadata, $ra, $extra_prop, $callback ) = @_;
     my $t = $self->mirror->get_svkpath;
     my ( $editor, undef, %opt ) = $t->get_editor(
         ignore_mirror => 1,
@@ -340,18 +347,41 @@
             $callback->( $changeset, $_[0] ) if $callback;
         }
     );
-    # XXX: sync relayed revmap as well
-    $opt{txn}->change_prop('svm:headrev', $self->mirror->server_uuid.":$changeset\n");
 
-    my $ra = $self->_new_ra;
-    if ( my $revprop = $self->mirror->depot->mirror->revprop ) {
-        my $prop = $ra->rev_proplist($changeset);
-        for (@$revprop) {
-            $opt{txn}->change_prop( $_, $prop->{$_} )
-                if exists $prop->{$_};
+    for (keys %$extra_prop) {
+	$opt{txn}->change_prop( $_, $extra_prop->{$_} );
+    }
+    $self->_revmap_prop( $opt{txn}, $changeset );
+
+    $editor = $self->_get_sync_editor($editor, $t);
+    $ra->replay( $changeset, 0, 1, $editor );
+    $self->_after_replay($ra, $editor);
+
+    return;
+
+}
+
+sub _after_replay {
+    my ($self, $ra, $editor) = @_;
+    if ( $editor->isa('SVK::Editor::SubTree') ) {
+	my $baton = $editor->anchor_baton;
+        if ( $editor->needs_touch ) {
+            $editor->change_dir_prop( $baton, 'svk:mirror' => undef );
         }
+	if (!$editor->changes) {
+	    $editor->abort_edit;
+	    return;
+	}
+        $editor->close_directory($baton);
     }
 
+    $editor->close_edit;
+    return;
+
+}
+
+sub _get_sync_editor {
+    my ($self, $editor, $target) = @_;
     $editor = SVK::Editor::CopyHandler->new(
         _editor => $editor,
         cb_copy => sub {
@@ -359,7 +389,7 @@
             return ( $path, $rev ) if $rev == -1;
             my $source_path = $self->source_path;
             $path =~ s/^\Q$self->{source_path}//;
-            return $t->as_url(
+            return $target->as_url(
                 1,
                 $self->mirror->path . $path,
                 $self->find_rev_from_changeset($rev)
@@ -370,7 +400,6 @@
     # ra->replay gives us editor calls based on repos root not
     # base uri, so we need to get the correct subtree.
     my $baton;
-    my $pool = SVN::Pool->new_default;
     if ( length $self->source_path ) {
         my $anchor = substr( $self->source_path, 1 );
         $baton  = $editor->open_root(-1);      # XXX: should use $t->revision
@@ -381,34 +410,55 @@
             }
         );
     }
-    $ra->replay( $changeset, 0, 1, $editor );
-    $self->_ra_finished($ra);
-    if ( length $self->source_path ) {
-        $editor->close_directory($baton);
-        if ( $editor->needs_touch ) {
-            $editor->change_dir_prop( $baton, 'svk:mirror' => undef );
-        }
-    }
-    if ( $editor->isa('SVK::Editor::SubTree') && !$editor->changes ) {
-        $editor->abort_edit;
-    } else {
-        $editor->close_edit;
-    }
-    return;
+    return $editor;
+}
 
+sub _revmap_prop {
+    my ($self, $txn, $changeset) = @_;
+    $txn->change_prop('svm:headrev', $self->mirror->server_uuid.":$changeset\n");
 }
 
+
 =item mirror_changesets
 
 =cut
 
 sub mirror_changesets {
     my ( $self, $torev, $callback ) = @_;
-
     $self->mirror->with_lock( 'mirror',
         sub {
-            $self->traverse_new_changesets(
-                sub { $self->sync_changeset( @_, $callback ) }, $torev );
+	    $self->refresh;
+	    my @revs;
+            $self->traverse_new_changesets( sub { push @revs, [@_] }, $torev );
+	    # prepare generator for pipelined ra
+	    my @gen;
+	    my $revprop = $self->mirror->depot->mirror->revprop; # XXX: this is so wrong
+	    return unless @revs;
+	    my $ra = $self->_new_ra;
+	    if ($self->use_pipeline) {
+		for (@revs) {
+		    push @gen, ['rev_proplist', $_->[0]] if $revprop;
+		    push @gen, ['replay', $_->[0], 0, 1, 'EDITOR'];
+		}
+		$ra = SVK::Mirror::Backend::SVNRaPipe->new($ra, sub { shift @gen });
+	    }
+	    my $pool = SVN::Pool->new_default;
+	    for (@revs) {
+		$pool->clear;
+		my ($changeset, $metadata) = @$_;
+		my $extra_prop = {};
+		if ( $revprop ) {
+		    my $prop = $ra->rev_proplist($changeset);
+		    for (@$revprop) {
+			$extra_prop->{$_}= $prop->{$_}
+			    if exists $prop->{$_};
+		    }
+		}
+		$self->sync_changeset( $changeset, $metadata, $ra,
+				       $extra_prop,
+				       $callback );
+	    }
+	    $self->_ra_finished($ra);
         }
     );
 }

Added: trunk/lib/SVK/Mirror/Backend/SVNRaPipe.pm
==============================================================================
--- (empty file)
+++ trunk/lib/SVK/Mirror/Backend/SVNRaPipe.pm	Fri Nov 10 11:11:52 2006
@@ -0,0 +1,238 @@
+package SVK::Mirror::Backend::SVNRaPipe;
+use strict;
+
+use base 'Class::Accessor::Fast';
+__PACKAGE__->mk_accessors(qw(ra requests fh unsent_buf buf_call current_editors pid));
+
+use POSIX 'EPIPE';
+use Socket;
+use Storable qw(nfreeze thaw);
+use SVK::Editor::Serialize;
+
+=head1 NAME
+
+SVK::Mirror::Backend::SVNRaPipe - Transparent SVN::Ra requests pipelining
+
+=head1 SYNOPSIS
+
+ my @req = (['rev_proplist', 3'], ['replay', 3 0, 1, 'EDITOR'])
+ $generator = sub { shift @req };
+ $pra = SVK::Mirror::Backend::SVNRaPipe->new($ra, $generator);
+
+ $pra->rev_proplsit(3);
+ $pra->replay(3, 0, 1, SVK::Editor->new);
+
+=head1 DESCRIPTION
+
+
+
+=cut
+
+sub new {
+    my ($class, $ra , $gen) = @_;
+
+    socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+	or  die "socketpair: $!";
+
+    my $self = $class->SUPER::new(
+        {
+            ra              => $ra,
+            requests        => $gen,
+            fh              => $c,
+            current_editors => 0,
+            buf_call        => [],
+            unsent_buf      => ''
+        }
+    );
+
+    if (my $pid = fork) {
+	close $p;
+	$self->pid($pid);
+	return $self;
+    }
+    else {
+	die "cannot fork: $!" unless defined $pid;
+	close $c;
+    }
+
+    $self->fh($p);
+    $File::Temp::KEEP_ALL = 1;
+    # Begin external process for buffered ra requests and send response to parent.
+    my $max_editor_in_buf = 5;
+    my $pool = SVN::Pool->new_default;
+    while (my $req = $gen->()) {
+	$pool->clear;
+	my ($cmd, @arg) = @$req;
+	@arg = map { $_ eq 'EDITOR' ? SVK::Editor::Serialize->new({ cb_serialize_entry =>
+								    sub { $self->_enqueue(@_); $self->try_flush } })
+			            : $_ } @arg;
+
+	# Note that we might want to switch to bandwidth based buffering,
+	while ($self->current_editors > $max_editor_in_buf) {
+	    $self->try_flush(1);
+	}
+
+	my $ret = $self->ra->$cmd(@arg);
+	if ($cmd eq 'replay') { # XXX support other requests using editors
+	    ++$self->{current_editors};
+	    $self->_enqueue([undef, 'close_edit']);
+	}
+	else {
+	    $self->_enqueue([$ret, $cmd]);
+	}
+	$self->try_flush();
+    }
+
+    while ($#{$self->buf_call} >= 0) {
+	$self->try_flush($p, 1) ;
+    }
+    exit;
+}
+
+sub _enqueue {
+    my ($self, $entry) = @_;
+    push @{$self->buf_call}, $entry;
+}
+
+sub try_flush {
+    my $self = shift;
+    my $wait = shift;
+    my $max_write = $wait ? -1 : 10;
+    if ($wait) {
+	$self->fh->blocking(1);
+    }
+    else {
+	$self->fh->blocking(0);
+	my $wstate = '';
+	vec($wstate,fileno($self->fh),1) = 1;
+	select(undef, $wstate, undef, 0);;
+	return unless vec($wstate,fileno($self->fh),1);
+    }
+    my $i = 0;
+    my $buf = $self->buf_call;
+    while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
+	if (my $len = length $self->unsent_buf) {
+	    if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
+		substr($self->{unsent_buf}, 0, $ret, '');
+		last if $ret != $len;
+	    }
+	    else {
+		die if $! == EPIPE;
+		return;
+	    }
+	}
+	last if $#{$buf} < 0;
+	my $msg = nfreeze($buf->[0]);
+	$msg = pack('N', length($msg)).$msg;
+
+	if (my $ret = syswrite($self->fh, $msg)) {
+	    $self->{unsent_buf} .= substr($msg, $ret)  if length($msg) != $ret;
+	    if ((shift @$buf)->[1] eq 'close_edit') {
+		--$self->{current_editors} ;
+	    }
+	}
+	else {
+	    die if $! == EPIPE;
+	    # XXX: check $! for fatal
+	    last;
+	}
+    }
+}
+
+# Client code reading pipelined responses
+
+sub read_msg {
+    my $self = shift;
+    my ($len, $msg);
+    read $self->fh, $len, 4 or die $!;
+    $len = unpack ('N', $len);
+    my $rlen = read $self->fh, $msg, $len or die $!;
+    return \$msg;
+}
+
+sub ensure_client_cmd {
+    my ($self, @arg) = @_;
+    # XXX: error message
+    my @exp = @{$self->requests->()};
+    for (@exp) {
+	my $arg = shift @arg;
+	if ($_ eq 'EDITOR') {
+	    die unless UNIVERSAL::isa($arg, 'SVK::Editor');
+	    return $arg;
+	}
+	Carp::confess "pipeline ra error: got $arg but expecting $_" if ($_ cmp $arg);
+    }
+    die join(',', at arg) if @arg;
+}
+
+sub rev_proplist {
+    my $self = shift;
+    $self->ensure_client_cmd('rev_proplist', @_);
+    # read synchronous msg
+    my $data = thaw( ${$self->read_msg} );
+    die 'inconsistent response' unless $data->[1] eq 'rev_proplist';
+    return $data->[0];
+}
+
+
+sub replay {
+    my $self = shift;
+    my $editor = $self->ensure_client_cmd('replay', @_);
+    my $baton_map = {};
+    my $baton_pool = {};
+
+    while (my $data = $self->read_msg) {
+	my ($next, $func, @arg) = @{thaw($$data)};
+	my $baton_at = SVK::Editor->baton_at($func);
+	my $baton = $arg[$baton_at];
+	if ($baton_at >= 0) {
+	    $arg[$baton_at] = $baton_map->{$baton};
+	}
+
+	my $ret = $self->emit_editor_call($editor, $func, undef, @arg);
+
+	last if $func eq 'close_edit';
+
+	if ($func =~ m/^close/) {
+	    Carp::cluck $func unless $baton_map->{$baton};
+	    delete $baton_map->{$baton};
+	    delete $baton_pool->{$baton};
+	}
+
+	if ($next) {
+	    $baton_pool->{$next} = SVN::Pool->new_default;
+	    $baton_map->{$next} = $ret
+	}
+    }
+}
+
+sub emit_editor_call {
+    my ($self, $editor, $func, $pool, @arg) = @_;
+    my ($ret, $baton_at);
+    if ($func eq 'apply_textdelta') {
+	my $svndiff = pop @arg;
+	$ret = $editor->apply_textdelta(@arg, $pool);
+
+	if ($ret && $#$ret > 0) {
+	    my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
+	    print $stream $svndiff;
+	    close $stream;
+	}
+    }
+    else {
+	# do not emit the fabricated close_edit, as replay doesn't
+	# give us that.  We need that in the stream so the client code
+	# of replay knows the end of response has reached.
+	$ret = $editor->$func(@arg, $pool)
+	    unless $func eq 'close_edit';
+    }
+    return $ret;
+}
+
+sub DESTROY {
+    my $self = shift;
+    return unless $self->pid;
+    wait;
+}
+
+1;

Modified: trunk/lib/SVK/Mirror/Backend/SVNSync.pm
==============================================================================
--- trunk/lib/SVK/Mirror/Backend/SVNSync.pm	(original)
+++ trunk/lib/SVK/Mirror/Backend/SVNSync.pm	Fri Nov 10 11:11:52 2006
@@ -49,50 +49,26 @@
     $self->mirror->depot->reposfs->change_rev_prop( 0, 'svn:svnsync:from-url',  $self->mirror->url );
 }
 
-sub find_rev_from_changeset {
-    return $_[0];
-}
-
-sub sync_changeset {
-    my ( $self, $changeset, $metadata, $callback ) = @_;
-    my $t = $self->mirror->get_svkpath('/');
-    my ( $editor, undef, %opt ) = $t->get_editor(
-        ignore_mirror => 1,
-        message       => $metadata->{message},
-        author        => $metadata->{author},
-        callback      => sub {
-            $t->repos->fs->change_rev_prop( $_[0], 'svn:date',
-                $metadata->{date} );
-            $self->fromrev( $_[0] );
-            $callback->( $changeset, $_[0] ) if $callback;
-        }
-    );
+sub find_rev_from_changeset { $_[0] }
 
-    my $ra = $self->_new_ra;
-    my $pool = SVN::Pool->new_default;
-    if ( my $revprop = $self->mirror->depot->mirror->revprop ) {
-        my $prop = $ra->rev_proplist($changeset);
-        for (@$revprop) {
-            $opt{txn}->change_prop( $_, $prop->{$_} )
-                if exists $prop->{$_};
-        }
-    }
+sub _revmap_prop { }
 
-    $editor = SVK::Editor::CopyHandler->new(
+sub _get_sync_editor {
+    my ($self, $editor, $target) = @_;
+    return SVK::Editor::CopyHandler->new(
         _editor => $editor,
         cb_copy => sub {
             my ( $editor, $path, $rev ) = @_;
             return ( $path, $rev ) if $rev == -1;
             $path =~ s{^\Q/}{};
-            return $t->as_url( 1, $path, $rev );
+            return $target->as_url( 1, $path, $rev );
         }
-    );
+    )
+}
 
-    $ra->replay( $changeset, 0, 1, $editor );
-    $self->_ra_finished($ra);
+sub _after_replay {
+    my ($self, $ra, $editor) = @_;
     $editor->close_edit;
-    return;
-
 }
 
 sub _relayed { }

Modified: trunk/lib/SVK/Path/View.pm
==============================================================================
--- trunk/lib/SVK/Path/View.pm	(original)
+++ trunk/lib/SVK/Path/View.pm	Fri Nov 10 11:11:52 2006
@@ -55,7 +55,9 @@
     }
 
     my ($editor, $inspector, %extra) = $self->source->new(path => $actual_anchor)->get_editor(%arg);
-
+    # XXX: view has txns, not very happy with forked processes.
+    $extra{mirror}->_backend->use_pipeline(0)
+        if $extra{mirror} && $extra{mirror}->_backend->isa('SVK::Mirror::Backend::SVNRa');
     my $prefix = abs2rel($self->source->path_anchor,
 			 $actual_anchor => undef, '/');
 


More information about the svk-commit mailing list