[svk-commit] r2131 - in trunk: lib/SVK/Editor
lib/SVK/Mirror/Backend lib/SVK/Path
clkao at bestpractical.com
clkao at bestpractical.com
Fri Nov 10 11:11:53 EST 2006
Author: clkao
Date: Fri Nov 10 11:11:52 2006
New Revision: 2131
Added:
trunk/lib/SVK/Editor/Serialize.pm
trunk/lib/SVK/Mirror/Backend/SVNRaPipe.pm
Modified:
trunk/ (props changed)
trunk/MANIFEST
trunk/lib/SVK/Mirror/Backend/SVNRa.pm
trunk/lib/SVK/Mirror/Backend/SVNSync.pm
trunk/lib/SVK/Path/View.pm
Log:
Merge the mirror pipeline branch to trunk.
This makes svk uses a separate process to buffer ra_replay calls,
so we are not bounded by network and local-fs-write latency. There
has been reports about 2x speedup for svk sync.
Modified: trunk/MANIFEST
==============================================================================
--- trunk/MANIFEST (original)
+++ trunk/MANIFEST Fri Nov 10 11:11:52 2006
@@ -78,6 +78,7 @@
lib/SVK/Editor/Merge.pm
lib/SVK/Editor/Patch.pm
lib/SVK/Editor/Rename.pm
+lib/SVK/Editor/Serialize.pm
lib/SVK/Editor/Sign.pm
lib/SVK/Editor/Status.pm
lib/SVK/Editor/SubTree.pm
@@ -117,6 +118,7 @@
lib/SVK/MimeDetect/Internal.pm
lib/SVK/Mirror.pm
lib/SVK/Mirror/Backend/SVNRa.pm
+lib/SVK/Mirror/Backend/SVNRaPipe.pm
lib/SVK/Mirror/Backend/SVNSync.pm
lib/SVK/MirrorCatalog.pm
lib/SVK/Notify.pm
Added: trunk/lib/SVK/Editor/Serialize.pm
==============================================================================
--- (empty file)
+++ trunk/lib/SVK/Editor/Serialize.pm Fri Nov 10 11:11:52 2006
@@ -0,0 +1,49 @@
+package SVK::Editor::Serialize;
+use base 'SVK::Editor';
+
+__PACKAGE__->mk_accessors(qw(cb_serialize_entry));
+
+sub AUTOLOAD {
+ my ($self, @arg) = @_;
+ my $func = our $AUTOLOAD;
+ $func =~ s/^.*:://;
+ return if $func =~ m/^[A-Z]+$/;
+ my $baton;
+ pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+
+ if ((my $baton_at = $self->baton_at ($func)) >= 0) {
+ $baton = $arg[$baton_at];
+ }
+ else {
+ $baton = 0;
+ }
+
+ my $ret = $func =~ m/^(?:add|open)/ ? ++$self->{batons} : undef;
+ Carp::cluck unless defined $func;
+ $self->cb_serialize_entry->([$ret, $func, @arg]);
+ return $ret;
+}
+
+my $apply_textdelta_entry;
+
+sub close_file {
+ my ($self, $baton, $checksum) = @_;
+ if ($apply_textdelta_entry) {
+ $self->cb_serialize_entry->($apply_textdelta_entry);
+ $apply_textdelta_entry = undef;
+ }
+ $self->cb_serialize_entry->([undef, 'close_file', $baton, $checksum]);
+}
+
+sub apply_textdelta {
+ my ($self, $baton, @arg) = @_;
+ pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+ my $entry = [undef, 'apply_textdelta', $baton, @arg, ''];
+ open my ($svndiff), '>', \$entry->[-1];
+# $self->cb_serialize_entry->($entry);
+ $apply_textdelta_entry = $entry;
+ return [SVN::TxDelta::to_svndiff($svndiff)];
+}
+
+
+1;
Modified: trunk/lib/SVK/Mirror/Backend/SVNRa.pm
==============================================================================
--- trunk/lib/SVK/Mirror/Backend/SVNRa.pm (original)
+++ trunk/lib/SVK/Mirror/Backend/SVNRa.pm Fri Nov 10 11:11:52 2006
@@ -6,6 +6,8 @@
use SVN::Ra;
use SVK::I18N;
use SVK::Editor;
+use SVK::Mirror::Backend::SVNRaPipe;
+
use Class::Autouse qw(SVK::Editor::SubTree SVK::Editor::CopyHandler);
## class SVK::Mirror::Backend::SVNRa;
@@ -19,7 +21,7 @@
# for this: things without _'s will probably move to base
# SVK::Mirror::Backend
-__PACKAGE__->mk_accessors(qw(mirror _config _auth_baton _auth_ref _auth_baton source_root source_path fromrev _has_replay _cached_ra));
+__PACKAGE__->mk_accessors(qw(mirror _config _auth_baton _auth_ref _auth_baton source_root source_path fromrev _has_replay _cached_ra use_pipeline));
=head1 NAME
@@ -51,7 +53,7 @@
sub load {
my ($class, $mirror) = @_;
- my $self = $class->SUPER::new( { mirror => $mirror } );
+ my $self = $class->SUPER::new( { mirror => $mirror, use_pipeline => 1 } );
my $t = $mirror->get_svkpath;
die loc( "%1 is not a mirrored path.\n", $t->depotpath )
unless $t->root->check_path( $mirror->path );
@@ -82,7 +84,7 @@
sub create {
my ($class, $mirror, $backend, $args, $txn, $editor) = @_;
- my $self = $class->SUPER::new({ mirror => $mirror });
+ my $self = $class->SUPER::new({ mirror => $mirror, use_pipeline => 1 });
my $ra = $self->_new_ra;
@@ -253,6 +255,7 @@
sub _ra_finished {
my ($self, $ra) = @_;
return if $self->_cached_ra;
+ return if ref($ra) eq 'SVK::Mirror::Backend::SVNRaPipe';
$self->_cached_ra( $ra );
}
@@ -326,8 +329,12 @@
die $@ if $@;
}
+=item sync_changeset($changeset, $metadata, $ra, $extra_prop, $callback )
+
+=cut
+
sub sync_changeset {
- my ($self, $changeset, $metadata, $callback) = @_;
+ my ( $self, $changeset, $metadata, $ra, $extra_prop, $callback ) = @_;
my $t = $self->mirror->get_svkpath;
my ( $editor, undef, %opt ) = $t->get_editor(
ignore_mirror => 1,
@@ -340,18 +347,41 @@
$callback->( $changeset, $_[0] ) if $callback;
}
);
- # XXX: sync relayed revmap as well
- $opt{txn}->change_prop('svm:headrev', $self->mirror->server_uuid.":$changeset\n");
- my $ra = $self->_new_ra;
- if ( my $revprop = $self->mirror->depot->mirror->revprop ) {
- my $prop = $ra->rev_proplist($changeset);
- for (@$revprop) {
- $opt{txn}->change_prop( $_, $prop->{$_} )
- if exists $prop->{$_};
+ for (keys %$extra_prop) {
+ $opt{txn}->change_prop( $_, $extra_prop->{$_} );
+ }
+ $self->_revmap_prop( $opt{txn}, $changeset );
+
+ $editor = $self->_get_sync_editor($editor, $t);
+ $ra->replay( $changeset, 0, 1, $editor );
+ $self->_after_replay($ra, $editor);
+
+ return;
+
+}
+
+sub _after_replay {
+ my ($self, $ra, $editor) = @_;
+ if ( $editor->isa('SVK::Editor::SubTree') ) {
+ my $baton = $editor->anchor_baton;
+ if ( $editor->needs_touch ) {
+ $editor->change_dir_prop( $baton, 'svk:mirror' => undef );
}
+ if (!$editor->changes) {
+ $editor->abort_edit;
+ return;
+ }
+ $editor->close_directory($baton);
}
+ $editor->close_edit;
+ return;
+
+}
+
+sub _get_sync_editor {
+ my ($self, $editor, $target) = @_;
$editor = SVK::Editor::CopyHandler->new(
_editor => $editor,
cb_copy => sub {
@@ -359,7 +389,7 @@
return ( $path, $rev ) if $rev == -1;
my $source_path = $self->source_path;
$path =~ s/^\Q$self->{source_path}//;
- return $t->as_url(
+ return $target->as_url(
1,
$self->mirror->path . $path,
$self->find_rev_from_changeset($rev)
@@ -370,7 +400,6 @@
# ra->replay gives us editor calls based on repos root not
# base uri, so we need to get the correct subtree.
my $baton;
- my $pool = SVN::Pool->new_default;
if ( length $self->source_path ) {
my $anchor = substr( $self->source_path, 1 );
$baton = $editor->open_root(-1); # XXX: should use $t->revision
@@ -381,34 +410,55 @@
}
);
}
- $ra->replay( $changeset, 0, 1, $editor );
- $self->_ra_finished($ra);
- if ( length $self->source_path ) {
- $editor->close_directory($baton);
- if ( $editor->needs_touch ) {
- $editor->change_dir_prop( $baton, 'svk:mirror' => undef );
- }
- }
- if ( $editor->isa('SVK::Editor::SubTree') && !$editor->changes ) {
- $editor->abort_edit;
- } else {
- $editor->close_edit;
- }
- return;
+ return $editor;
+}
+sub _revmap_prop {
+ my ($self, $txn, $changeset) = @_;
+ $txn->change_prop('svm:headrev', $self->mirror->server_uuid.":$changeset\n");
}
+
=item mirror_changesets
=cut
sub mirror_changesets {
my ( $self, $torev, $callback ) = @_;
-
$self->mirror->with_lock( 'mirror',
sub {
- $self->traverse_new_changesets(
- sub { $self->sync_changeset( @_, $callback ) }, $torev );
+ $self->refresh;
+ my @revs;
+ $self->traverse_new_changesets( sub { push @revs, [@_] }, $torev );
+ # prepare generator for pipelined ra
+ my @gen;
+ my $revprop = $self->mirror->depot->mirror->revprop; # XXX: this is so wrong
+ return unless @revs;
+ my $ra = $self->_new_ra;
+ if ($self->use_pipeline) {
+ for (@revs) {
+ push @gen, ['rev_proplist', $_->[0]] if $revprop;
+ push @gen, ['replay', $_->[0], 0, 1, 'EDITOR'];
+ }
+ $ra = SVK::Mirror::Backend::SVNRaPipe->new($ra, sub { shift @gen });
+ }
+ my $pool = SVN::Pool->new_default;
+ for (@revs) {
+ $pool->clear;
+ my ($changeset, $metadata) = @$_;
+ my $extra_prop = {};
+ if ( $revprop ) {
+ my $prop = $ra->rev_proplist($changeset);
+ for (@$revprop) {
+ $extra_prop->{$_}= $prop->{$_}
+ if exists $prop->{$_};
+ }
+ }
+ $self->sync_changeset( $changeset, $metadata, $ra,
+ $extra_prop,
+ $callback );
+ }
+ $self->_ra_finished($ra);
}
);
}
Added: trunk/lib/SVK/Mirror/Backend/SVNRaPipe.pm
==============================================================================
--- (empty file)
+++ trunk/lib/SVK/Mirror/Backend/SVNRaPipe.pm Fri Nov 10 11:11:52 2006
@@ -0,0 +1,238 @@
+package SVK::Mirror::Backend::SVNRaPipe;
+use strict;
+
+use base 'Class::Accessor::Fast';
+__PACKAGE__->mk_accessors(qw(ra requests fh unsent_buf buf_call current_editors pid));
+
+use POSIX 'EPIPE';
+use Socket;
+use Storable qw(nfreeze thaw);
+use SVK::Editor::Serialize;
+
+=head1 NAME
+
+SVK::Mirror::Backend::SVNRaPipe - Transparent SVN::Ra requests pipelining
+
+=head1 SYNOPSIS
+
+ my @req = (['rev_proplist', 3'], ['replay', 3 0, 1, 'EDITOR'])
+ $generator = sub { shift @req };
+ $pra = SVK::Mirror::Backend::SVNRaPipe->new($ra, $generator);
+
+ $pra->rev_proplsit(3);
+ $pra->replay(3, 0, 1, SVK::Editor->new);
+
+=head1 DESCRIPTION
+
+
+
+=cut
+
+sub new {
+ my ($class, $ra , $gen) = @_;
+
+ socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+ or die "socketpair: $!";
+
+ my $self = $class->SUPER::new(
+ {
+ ra => $ra,
+ requests => $gen,
+ fh => $c,
+ current_editors => 0,
+ buf_call => [],
+ unsent_buf => ''
+ }
+ );
+
+ if (my $pid = fork) {
+ close $p;
+ $self->pid($pid);
+ return $self;
+ }
+ else {
+ die "cannot fork: $!" unless defined $pid;
+ close $c;
+ }
+
+ $self->fh($p);
+ $File::Temp::KEEP_ALL = 1;
+ # Begin external process for buffered ra requests and send response to parent.
+ my $max_editor_in_buf = 5;
+ my $pool = SVN::Pool->new_default;
+ while (my $req = $gen->()) {
+ $pool->clear;
+ my ($cmd, @arg) = @$req;
+ @arg = map { $_ eq 'EDITOR' ? SVK::Editor::Serialize->new({ cb_serialize_entry =>
+ sub { $self->_enqueue(@_); $self->try_flush } })
+ : $_ } @arg;
+
+ # Note that we might want to switch to bandwidth based buffering,
+ while ($self->current_editors > $max_editor_in_buf) {
+ $self->try_flush(1);
+ }
+
+ my $ret = $self->ra->$cmd(@arg);
+ if ($cmd eq 'replay') { # XXX support other requests using editors
+ ++$self->{current_editors};
+ $self->_enqueue([undef, 'close_edit']);
+ }
+ else {
+ $self->_enqueue([$ret, $cmd]);
+ }
+ $self->try_flush();
+ }
+
+ while ($#{$self->buf_call} >= 0) {
+ $self->try_flush($p, 1) ;
+ }
+ exit;
+}
+
+sub _enqueue {
+ my ($self, $entry) = @_;
+ push @{$self->buf_call}, $entry;
+}
+
+sub try_flush {
+ my $self = shift;
+ my $wait = shift;
+ my $max_write = $wait ? -1 : 10;
+ if ($wait) {
+ $self->fh->blocking(1);
+ }
+ else {
+ $self->fh->blocking(0);
+ my $wstate = '';
+ vec($wstate,fileno($self->fh),1) = 1;
+ select(undef, $wstate, undef, 0);;
+ return unless vec($wstate,fileno($self->fh),1);
+ }
+ my $i = 0;
+ my $buf = $self->buf_call;
+ while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
+ if (my $len = length $self->unsent_buf) {
+ if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
+ substr($self->{unsent_buf}, 0, $ret, '');
+ last if $ret != $len;
+ }
+ else {
+ die if $! == EPIPE;
+ return;
+ }
+ }
+ last if $#{$buf} < 0;
+ my $msg = nfreeze($buf->[0]);
+ $msg = pack('N', length($msg)).$msg;
+
+ if (my $ret = syswrite($self->fh, $msg)) {
+ $self->{unsent_buf} .= substr($msg, $ret) if length($msg) != $ret;
+ if ((shift @$buf)->[1] eq 'close_edit') {
+ --$self->{current_editors} ;
+ }
+ }
+ else {
+ die if $! == EPIPE;
+ # XXX: check $! for fatal
+ last;
+ }
+ }
+}
+
+# Client code reading pipelined responses
+
+sub read_msg {
+ my $self = shift;
+ my ($len, $msg);
+ read $self->fh, $len, 4 or die $!;
+ $len = unpack ('N', $len);
+ my $rlen = read $self->fh, $msg, $len or die $!;
+ return \$msg;
+}
+
+sub ensure_client_cmd {
+ my ($self, @arg) = @_;
+ # XXX: error message
+ my @exp = @{$self->requests->()};
+ for (@exp) {
+ my $arg = shift @arg;
+ if ($_ eq 'EDITOR') {
+ die unless UNIVERSAL::isa($arg, 'SVK::Editor');
+ return $arg;
+ }
+ Carp::confess "pipeline ra error: got $arg but expecting $_" if ($_ cmp $arg);
+ }
+ die join(',', at arg) if @arg;
+}
+
+sub rev_proplist {
+ my $self = shift;
+ $self->ensure_client_cmd('rev_proplist', @_);
+ # read synchronous msg
+ my $data = thaw( ${$self->read_msg} );
+ die 'inconsistent response' unless $data->[1] eq 'rev_proplist';
+ return $data->[0];
+}
+
+
+sub replay {
+ my $self = shift;
+ my $editor = $self->ensure_client_cmd('replay', @_);
+ my $baton_map = {};
+ my $baton_pool = {};
+
+ while (my $data = $self->read_msg) {
+ my ($next, $func, @arg) = @{thaw($$data)};
+ my $baton_at = SVK::Editor->baton_at($func);
+ my $baton = $arg[$baton_at];
+ if ($baton_at >= 0) {
+ $arg[$baton_at] = $baton_map->{$baton};
+ }
+
+ my $ret = $self->emit_editor_call($editor, $func, undef, @arg);
+
+ last if $func eq 'close_edit';
+
+ if ($func =~ m/^close/) {
+ Carp::cluck $func unless $baton_map->{$baton};
+ delete $baton_map->{$baton};
+ delete $baton_pool->{$baton};
+ }
+
+ if ($next) {
+ $baton_pool->{$next} = SVN::Pool->new_default;
+ $baton_map->{$next} = $ret
+ }
+ }
+}
+
+sub emit_editor_call {
+ my ($self, $editor, $func, $pool, @arg) = @_;
+ my ($ret, $baton_at);
+ if ($func eq 'apply_textdelta') {
+ my $svndiff = pop @arg;
+ $ret = $editor->apply_textdelta(@arg, $pool);
+
+ if ($ret && $#$ret > 0) {
+ my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
+ print $stream $svndiff;
+ close $stream;
+ }
+ }
+ else {
+ # do not emit the fabricated close_edit, as replay doesn't
+ # give us that. We need that in the stream so the client code
+ # of replay knows the end of response has reached.
+ $ret = $editor->$func(@arg, $pool)
+ unless $func eq 'close_edit';
+ }
+ return $ret;
+}
+
+sub DESTROY {
+ my $self = shift;
+ return unless $self->pid;
+ wait;
+}
+
+1;
Modified: trunk/lib/SVK/Mirror/Backend/SVNSync.pm
==============================================================================
--- trunk/lib/SVK/Mirror/Backend/SVNSync.pm (original)
+++ trunk/lib/SVK/Mirror/Backend/SVNSync.pm Fri Nov 10 11:11:52 2006
@@ -49,50 +49,26 @@
$self->mirror->depot->reposfs->change_rev_prop( 0, 'svn:svnsync:from-url', $self->mirror->url );
}
-sub find_rev_from_changeset {
- return $_[0];
-}
-
-sub sync_changeset {
- my ( $self, $changeset, $metadata, $callback ) = @_;
- my $t = $self->mirror->get_svkpath('/');
- my ( $editor, undef, %opt ) = $t->get_editor(
- ignore_mirror => 1,
- message => $metadata->{message},
- author => $metadata->{author},
- callback => sub {
- $t->repos->fs->change_rev_prop( $_[0], 'svn:date',
- $metadata->{date} );
- $self->fromrev( $_[0] );
- $callback->( $changeset, $_[0] ) if $callback;
- }
- );
+sub find_rev_from_changeset { $_[0] }
- my $ra = $self->_new_ra;
- my $pool = SVN::Pool->new_default;
- if ( my $revprop = $self->mirror->depot->mirror->revprop ) {
- my $prop = $ra->rev_proplist($changeset);
- for (@$revprop) {
- $opt{txn}->change_prop( $_, $prop->{$_} )
- if exists $prop->{$_};
- }
- }
+sub _revmap_prop { }
- $editor = SVK::Editor::CopyHandler->new(
+sub _get_sync_editor {
+ my ($self, $editor, $target) = @_;
+ return SVK::Editor::CopyHandler->new(
_editor => $editor,
cb_copy => sub {
my ( $editor, $path, $rev ) = @_;
return ( $path, $rev ) if $rev == -1;
$path =~ s{^\Q/}{};
- return $t->as_url( 1, $path, $rev );
+ return $target->as_url( 1, $path, $rev );
}
- );
+ )
+}
- $ra->replay( $changeset, 0, 1, $editor );
- $self->_ra_finished($ra);
+sub _after_replay {
+ my ($self, $ra, $editor) = @_;
$editor->close_edit;
- return;
-
}
sub _relayed { }
Modified: trunk/lib/SVK/Path/View.pm
==============================================================================
--- trunk/lib/SVK/Path/View.pm (original)
+++ trunk/lib/SVK/Path/View.pm Fri Nov 10 11:11:52 2006
@@ -55,7 +55,9 @@
}
my ($editor, $inspector, %extra) = $self->source->new(path => $actual_anchor)->get_editor(%arg);
-
+ # XXX: view has txns, not very happy with forked processes.
+ $extra{mirror}->_backend->use_pipeline(0)
+ if $extra{mirror} && $extra{mirror}->_backend->isa('SVK::Mirror::Backend::SVNRa');
my $prefix = abs2rel($self->source->path_anchor,
$actual_anchor => undef, '/');
More information about the svk-commit
mailing list