[svk-commit] r2107 - in branches/mirror-pipeline: lib/SVK/Editor
lib/SVK/Mirror/Backend
clkao at bestpractical.com
clkao at bestpractical.com
Sat Nov 4 16:25:42 EST 2006
Author: clkao
Date: Sat Nov 4 16:25:39 2006
New Revision: 2107
Added:
branches/mirror-pipeline/lib/SVK/Editor/Serialize.pm
Removed:
branches/mirror-pipeline/utils/replay-pipeline.pl
Modified:
branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm
Log:
Move pipeline code into ::SVNSync so we don't exec
Added: branches/mirror-pipeline/lib/SVK/Editor/Serialize.pm
==============================================================================
--- (empty file)
+++ branches/mirror-pipeline/lib/SVK/Editor/Serialize.pm Sat Nov 4 16:25:39 2006
@@ -0,0 +1,51 @@
+package SVK::Editor::Serialize;
+use base 'SVK::Editor';
+
+__PACKAGE__->mk_accessors(qw(cb_serialize_entry));
+
+sub AUTOLOAD {
+ my ($self, @arg) = @_;
+ my $func = our $AUTOLOAD;
+ $func =~ s/^.*:://;
+ return if $func =~ m/^[A-Z]+$/;
+ my $baton;
+ pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+
+ warn "==> starting " if $func eq 'open_root';
+
+ if ((my $baton_at = $self->baton_at ($func)) >= 0) {
+ $baton = $arg[$baton_at];
+ }
+ else {
+ $baton = 0;
+ }
+
+ my $ret = $func =~ m/^(?:add|open)/ ? ++$self->{batons} : undef;
+ Carp::cluck unless defined $func;
+ $self->cb_serialize_entry->([$ret, $func, @arg]);
+ return $ret;
+}
+
+my $apply_textdelta_entry;
+
+sub close_file {
+ my ($self, $baton, $checksum) = @_;
+ if ($apply_textdelta_entry) {
+ $self->cb_serialize_entry->($apply_textdelta_entry);
+ $apply_textdelta_entry = undef;
+ }
+ $self->cb_serialize_entry->([undef, 'close_file', $baton, $checksum]);
+}
+
+sub apply_textdelta {
+ my ($self, $baton, @arg) = @_;
+ pop @arg if ref ($arg[-1]) =~ m/^(?:SVN::Pool|_p_apr_pool_t)$/;
+ my $entry = [undef, 'apply_textdelta', $baton, @arg, ''];
+ open my ($svndiff), '>', \$entry->[-1];
+# $self->cb_serialize_entry->($entry);
+ $apply_textdelta_entry = $entry;
+ return [SVN::TxDelta::to_svndiff($svndiff)];
+}
+
+
+1;
Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm (original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm Sat Nov 4 16:25:39 2006
@@ -98,9 +98,15 @@
=cut
+sub get_pipeline_editor_fh {
+ my ($self, $gen) = @_;
+ my $ra = $self->_new_ra;
+
+ return SVK::Mirror::Backend::SVNSync::Async->new($self, $ra, $gen);
+}
+
sub mirror_changesets {
my ( $self, $torev, $callback ) = @_;
-
$self->mirror->with_lock( 'mirror',
sub {
$self->refresh;
@@ -108,11 +114,8 @@
my $ra = $self->_new_ra;
$torev ||= $ra->get_latest_revnum;
$self->_ra_finished($ra);
- warn $self->mirror->depot->repospath;
- warn "$from $torev";
- open my $fh, '-|', "perl -Ilib utils/replay-pipeline.pl ".$self->mirror->depot->repospath." / $from $torev 2>stderr " or die $!;
- require IO::Handle;
-# $fh->blocking(0);
+ my @revs = ($from..$torev);
+ my $fh = $self->get_pipeline_editor_fh(sub { shift @revs });
$self->traverse_new_changesets(
sub { $self->evil_sync_changeset( @_, $fh, $callback ) }, $torev );
}
@@ -124,11 +127,9 @@
my ($self, $editor, $func, $pool, @arg) = @_;
my ($ret, $baton_at);
if ($func eq 'apply_textdelta') {
-# $pool->default;
my $svndiff = pop @arg;
$ret = $editor->apply_textdelta(@arg, $pool);
if ($ret && $#$ret > 0) {
-# warn md5_hex($svndiff);
my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
print $stream $svndiff;
close $stream;
@@ -159,7 +160,6 @@
my $pool;
while (my $data = read_msg($fh)) {
my $x = thaw($$data);
-# Carp::cluck (length $$data ) unless defined $x;
my ($next, $func, @arg) = @$x;
my $baton_at = SVK::Editor->baton_at($func);
my $baton = $arg[$baton_at];
@@ -181,7 +181,6 @@
$baton_pool->{$next} = SVN::Pool->new_default;
$baton_map->{$next} = $ret
}
-# warn "==> close edit" if $func eq 'close_edit';
}
}
@@ -220,15 +219,121 @@
}
);
-# warn "==> begin evil $changeset";
$self->_read_evil_replay($editor, $fh);
$self->_ra_finished($ra);
-#die;
return;
}
-
sub _relayed { }
+package SVK::Mirror::Backend::SVNSync::Async;
+use SVK::Editor::Serialize;
+use Socket;
+
+my $max_editor_in_buf = 5;
+my $current_editors = 0;
+my $unsent_buf = '';
+
+my $buf;
+my $max;
+
+use IO::Handle;
+use Storable 'nfreeze';
+use POSIX 'EPIPE';
+
+sub on_close_edit {
+ --$current_editors;
+}
+
+sub try_flush {
+ my $fh = shift;
+ my $wait = shift;
+ my $max_write = $wait ? -1 : 10;
+ if ($wait) {
+ $fh->blocking(1);
+ }
+ else {
+ $fh->blocking(0);
+ my $wstate = '';
+ vec($wstate,fileno($fh),1) = 1;
+ select(undef, $wstate, undef, 0);;
+ return unless vec($wstate,fileno($fh),1);
+
+ }
+ my $i = 0;
+ while (
+ $#{$buf} >= 0 || length($unsent_buf) ) {
+ if (my $len = length $unsent_buf) {
+ if (my $ret = syswrite($fh, $unsent_buf)) {
+ substr($unsent_buf, 0, $ret, '');
+ last if $ret != $len;
+ }
+ else {
+ die if $! == EPIPE;
+ return;
+ }
+ }
+ last if $#{$buf} < 0;
+ use Carp;
+ Carp::cluck unless defined $buf->[0];
+ my $msg = nfreeze($buf->[0]);
+ $msg = pack('N', length($msg)).$msg;
+
+ if (my $ret = syswrite($fh, $msg)) {
+ $unsent_buf .= substr($msg, $ret) if length($msg) != $ret;
+ on_close_edit() if (shift @$buf)->[1] eq 'close_edit';
+ }
+ else {
+ die if $! == EPIPE;
+ # XXX: check $! for fatal
+ last;
+ }
+ }
+}
+
+sub entry {
+ my $entry = shift;
+ push @$buf, $entry;
+}
+
+sub new {
+ my ($self, $svnsync, $ra, $gen) = @_;
+ socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+ or die "socketpair: $!";
+
+ if (my $pid = fork) {
+ close $p;
+ return $c;
+ }
+ else {
+ die "cannot fork: $!" unless defined $pid;
+ close $c;
+ }
+
+ my $pool = SVN::Pool->new_default;
+ while (my $changeset = $gen->()) {
+ $pool->clear;
+ while ($current_editors > $max_editor_in_buf) {
+ warn "waiting for flush for $changeset.. ($current_editors).";
+ try_flush($p, 1);
+ }
+
+ ++$current_editors;
+ warn "replay $changeset ($current_editors)";
+ $ra->replay($changeset, 0, 1,# SVK::Editor->new(_debug=>1));
+ SVK::Editor::Serialize->new({ cb_serialize_entry =>
+ sub { entry(@_); try_flush($p) } }));
+ entry([undef, 'close_edit']);
+ try_flush($p);
+ }
+
+ while ($#{$buf} >= 0) {
+ try_flush($p, 1) ;
+ }
+ exit;
+}
+
+
+
1;
More information about the svk-commit
mailing list