[svk-commit] r2117 - in branches/mirror-pipeline: .
clkao at bestpractical.com
clkao at bestpractical.com
Mon Nov 6 04:12:26 EST 2006
Author: clkao
Date: Mon Nov 6 04:12:26 2006
New Revision: 2117
Added:
branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm
Modified:
branches/mirror-pipeline/ (props changed)
branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm
Log:
r7222 at ubuntu: clkao | 2006-11-06 02:50:38 +0000
put generalised pipeline ra in seperate module to be used by svnra backend
as well.
Added: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm
==============================================================================
--- (empty file)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm Mon Nov 6 04:12:26 2006
@@ -0,0 +1,229 @@
+package SVK::Mirror::Backend::SVNRaPipe;
+use strict;
+
+use base 'Class::Accessor::Fast';
+__PACKAGE__->mk_accessors(qw(ra requests fh unsent_buf buf_call current_editors));
+
+use POSIX 'EPIPE';
+use Socket;
+
+
+=head1 NAME
+
+SVK::Mirror::Backend::SVNRaPipe -
+
+=head1 SYNOPSIS
+
+ my @req = (['rev_proplist', 3'], ['replay', 3 0, 1, 'EDITOR'])
+ $generator = sub { shift @req };
+ $pra = SVK::Mirror::Backend::SVNRaPipe->new($ra, $generator);
+
+ $pra->rev_proplsit(3);
+ $pra->replay(3, 0, 1, SVK::Editor->new);
+
+=head1 DESCRIPTION
+
+
+
+=cut
+
+sub entry {
+ my ($self, $entry) = @_;
+ push @{$self->buf_call}, $entry;
+}
+
+sub try_flush {
+ my $self = shift;
+ my $wait = shift;
+ my $max_write = $wait ? -1 : 10;
+ if ($wait) {
+ $self->fh->blocking(1);
+ }
+ else {
+ $self->fh->blocking(0);
+ my $wstate = '';
+ vec($wstate,fileno($self->fh),1) = 1;
+ select(undef, $wstate, undef, 0);;
+ return unless vec($wstate,fileno($self->fh),1);
+
+ }
+ my $i = 0;
+ my $buf = $self->buf_call;
+ while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
+ if (my $len = length $self->unsent_buf) {
+ if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
+ substr($self->{unsent_buf}, 0, $ret, '');
+ last if $ret != $len;
+ }
+ else {
+ die if $! == EPIPE;
+ return;
+ }
+ }
+ last if $#{$buf} < 0;
+ my $msg = nfreeze($buf->[0]);
+ $msg = pack('N', length($msg)).$msg;
+
+ if (my $ret = syswrite($self->fh, $msg)) {
+ $self->{unsent_buf} .= substr($msg, $ret) if length($msg) != $ret;
+ if ((shift @$buf)->[1] eq 'close_edit') {
+ --$self->{current_editors} ;
+ }
+ }
+ else {
+ die if $! == EPIPE;
+ # XXX: check $! for fatal
+ last;
+ }
+ }
+}
+
+
+sub new {
+ my ($class, $ra , $gen) = @_;
+
+ socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+ or die "socketpair: $!";
+
+ my $self = $class->SUPER::new(
+ {
+ ra => $ra,
+ requests => $gen,
+ fh => $c,
+ current_editors => 0,
+ buf_call => [],
+ unsent_buf => ''
+ }
+ );
+
+ if (my $pid = fork) {
+ close $p;
+ return $self;
+ }
+ else {
+ die "cannot fork: $!" unless defined $pid;
+ close $c;
+ }
+
+ $self->fh($p);
+ # Begin external process for buffered ra requests.
+ my $max_editor_in_buf = 5;
+ my $pool = SVN::Pool->new_default;
+ while (my $req = $gen->()) {
+ $pool->clear;
+ my ($cmd, @arg) = @$req;
+ require SVK::Editor::Serialize;
+ @arg = map { $_ eq 'EDITOR' ? SVK::Editor::Serialize->new({ cb_serialize_entry =>
+ sub { $self->entry(@_); $self->try_flush } })
+ : $_ } @arg;
+
+ while ($self->current_editors > $max_editor_in_buf) {
+ $self->try_flush(1);
+ }
+
+ my $ret = $self->ra->$cmd(@arg);
+ if ($cmd eq 'replay') { # XXX or other requests using editors
+ ++$self->{current_editors};
+ $self->entry([undef, 'close_edit']);
+ }
+ else {
+ $self->entry([$ret, $cmd]);
+ }
+ $self->try_flush();
+ }
+
+ while ($#{$self->buf_call} >= 0) {
+ $self->try_flush($p, 1) ;
+ }
+ exit;
+}
+
+# Client code reading pipelined responses
+
+use Storable qw(nfreeze thaw);
+
+sub read_msg {
+ my $self = shift;
+ my ($len, $msg);
+ read $self->fh, $len, 4 or die $!;
+ $len = unpack ('N', $len);
+ my $rlen = read $self->fh, $msg, $len or die $!;
+ return \$msg;
+}
+
+sub ensure_client_cmd {
+ my ($self, @arg) = @_;
+ # XXX: error message
+ my @exp = @{$self->requests->()};
+ for (@exp) {
+ my $arg = shift @arg;
+ if ($_ eq 'EDITOR') {
+ die unless UNIVERSAL::isa($arg, 'SVK::Editor');
+ return $arg;
+ }
+ die if ($_ cmp $arg);
+ }
+ die join(',', at arg) if @arg;
+}
+
+sub rev_proplist {
+ my $self = shift;
+ $self->ensure_client_cmd('rev_proplist', @_);
+ # read synchronous msg
+ my $data = thaw( ${$self->read_msg} );
+ # XXX: ensure $data->[1] is rev_proplist
+ return $data->[0];
+}
+
+
+sub replay {
+ my $self = shift;
+ my $editor = $self->ensure_client_cmd('replay', @_);
+ my $baton_map = {};
+ my $baton_pool = {};
+
+ while (my $data = $self->read_msg) {
+ my ($next, $func, @arg) = @{thaw($$data)};
+ my $baton_at = SVK::Editor->baton_at($func);
+ my $baton = $arg[$baton_at];
+ if ($baton_at >= 0) {
+ $arg[$baton_at] = $baton_map->{$baton};
+ }
+
+ my $ret = $self->emit_editor_call($editor, $func, undef, @arg);
+
+ last if $func eq 'close_edit';
+
+ if ($func =~ m/^close/) {
+ Carp::cluck $func unless $baton_map->{$baton};
+ delete $baton_map->{$baton};
+ delete $baton_pool->{$baton};
+ }
+
+ if ($next) {
+ $baton_pool->{$next} = SVN::Pool->new_default;
+ $baton_map->{$next} = $ret
+ }
+ }
+}
+
+sub emit_editor_call {
+ my ($self, $editor, $func, $pool, @arg) = @_;
+ my ($ret, $baton_at);
+ if ($func eq 'apply_textdelta') {
+ my $svndiff = pop @arg;
+ $ret = $editor->apply_textdelta(@arg, $pool);
+
+ if ($ret && $#$ret > 0) {
+ my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
+ print $stream $svndiff;
+ close $stream;
+ }
+ }
+ else {
+ $ret = $editor->$func(@arg, $pool);
+ }
+ return $ret;
+}
+
+1;
Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm (original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNSync.pm Mon Nov 6 04:12:26 2006
@@ -99,92 +99,29 @@
=cut
-sub get_pipeline_editor_fh {
- my ($self, $gen) = @_;
- my $ra = $self->_new_ra;
-
- return SVK::Mirror::Backend::SVNSync::Async->new($self, $ra, $gen);
-}
-
sub mirror_changesets {
my ( $self, $torev, $callback ) = @_;
$self->mirror->with_lock( 'mirror',
sub {
$self->refresh;
- my $from = ($self->fromrev || 0)+1;
- my $ra = $self->_new_ra;
- $torev ||= $ra->get_latest_revnum;
- $self->_ra_finished($ra);
- my @revs = ($from..$torev);
- my $fh = $self->get_pipeline_editor_fh(sub { shift @revs });
- $self->traverse_new_changesets(
- sub { $self->evil_sync_changeset( @_, $fh, $callback ) }, $torev );
+ my @revs;
+ $self->traverse_new_changesets( sub { push @revs, [@_] }, $torev );
+ # prepare generator for pipelined ra
+ my @gen;
+ for ($revs[0][0]..$revs[-1][0]) {
+ push @gen, ['rev_proplist', $_], ['replay', $_, 0, 1, 'EDITOR'];
+ }
+ require SVK::Mirror::Backend::SVNRaPipe;
+ my $pra = SVK::Mirror::Backend::SVNRaPipe->new($self->_new_ra, sub { shift @gen });
+ for (@revs) {
+ $self->evil_sync_changeset( @$_, $pra, $callback );
+ }
}
);
}
-sub emit {
- my ($self, $editor, $func, $pool, @arg) = @_;
- my ($ret, $baton_at);
- if ($func eq 'apply_textdelta') {
- my $svndiff = pop @arg;
- $ret = $editor->apply_textdelta(@arg, $pool);
- if ($ret && $#$ret > 0) {
- my $stream = SVN::TxDelta::parse_svndiff(@$ret, 1, $pool);
- print $stream $svndiff;
- close $stream;
- }
- }
- else {
- $ret = $editor->$func(@arg, $pool);
- }
- return $ret;
-}
-
-use Storable 'thaw';
-
-sub read_msg {
- my ($sock) = @_;
- my ($len, $msg);
- read $sock, $len, 4 or die $!;
- $len = unpack ('N', $len);
- my $rlen = read $sock, $msg, $len or die $!;
- return \$msg;
-}
-
-sub _read_evil_replay {
- my ($self, $editor, $fh) = @_;
- my $baton_map = {};
- my $baton_pool = {};
-
- while (my $data = read_msg($fh)) {
- my $x = thaw($$data);
- my ($next, $func, @arg) = @$x;
- my $baton_at = SVK::Editor->baton_at($func);
- my $baton = $arg[$baton_at];
- if ($baton_at >= 0) {
- $arg[$baton_at] = $baton_map->{$baton};
- }
-
- my $ret = $self->emit($editor, $func, undef, @arg);
-
- last if $func eq 'close_edit';
-
- if ($func =~ m/^close/) {
- Carp::cluck $func unless $baton_map->{$baton};
- delete $baton_map->{$baton};
- delete $baton_pool->{$baton};
- }
-
- if ($next) {
- $baton_pool->{$next} = SVN::Pool->new_default;
- $baton_map->{$next} = $ret
- }
- }
-}
-
sub evil_sync_changeset {
- my ( $self, $changeset, $metadata, $fh, $callback ) = @_;
+ my ( $self, $changeset, $metadata, $ra, $callback ) = @_;
my $t = $self->mirror->get_svkpath('/');
my ( $editor, undef, %opt ) = $t->get_editor(
ignore_mirror => 1,
@@ -198,7 +135,6 @@
}
);
- my $ra = $self->_new_ra;
my $pool = SVN::Pool->new_default;
if ( my $revprop = $self->mirror->depot->mirror->revprop ) {
my $prop = $ra->rev_proplist($changeset);
@@ -217,119 +153,11 @@
return $t->as_url( 1, $path, $rev );
}
);
-
- $self->_read_evil_replay($editor, $fh);
- $self->_ra_finished($ra);
+ $ra->replay( $changeset, 0, 1, $editor );
return;
}
sub _relayed { }
-package SVK::Mirror::Backend::SVNSync::Async;
-use SVK::Editor::Serialize;
-use Socket;
-
-my $max_editor_in_buf = 5;
-my $current_editors = 0;
-my $unsent_buf = '';
-
-my $buf;
-my $max;
-
-use IO::Handle;
-use Storable 'nfreeze';
-use POSIX 'EPIPE';
-
-sub on_close_edit {
- --$current_editors;
-}
-
-sub try_flush {
- my $fh = shift;
- my $wait = shift;
- my $max_write = $wait ? -1 : 10;
- if ($wait) {
- $fh->blocking(1);
- }
- else {
- $fh->blocking(0);
- my $wstate = '';
- vec($wstate,fileno($fh),1) = 1;
- select(undef, $wstate, undef, 0);;
- return unless vec($wstate,fileno($fh),1);
-
- }
- my $i = 0;
- while ( $#{$buf} >= 0 || length($unsent_buf) ) {
- if (my $len = length $unsent_buf) {
- if (my $ret = syswrite($fh, $unsent_buf)) {
- substr($unsent_buf, 0, $ret, '');
- last if $ret != $len;
- }
- else {
- die if $! == EPIPE;
- return;
- }
- }
- last if $#{$buf} < 0;
- use Carp;
- Carp::cluck unless defined $buf->[0];
- my $msg = nfreeze($buf->[0]);
- $msg = pack('N', length($msg)).$msg;
-
- if (my $ret = syswrite($fh, $msg)) {
- $unsent_buf .= substr($msg, $ret) if length($msg) != $ret;
- on_close_edit() if (shift @$buf)->[1] eq 'close_edit';
- }
- else {
- die if $! == EPIPE;
- # XXX: check $! for fatal
- last;
- }
- }
-}
-
-sub entry {
- my $entry = shift;
- push @$buf, $entry;
-}
-
-sub new {
- my ($self, $svnsync, $ra, $gen) = @_;
- socketpair(my $c, my $p, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
- or die "socketpair: $!";
-
- if (my $pid = fork) {
- close $p;
- return $c;
- }
- else {
- die "cannot fork: $!" unless defined $pid;
- close $c;
- }
-
- my $pool = SVN::Pool->new_default;
- while (my $changeset = $gen->()) {
- $pool->clear;
- while ($current_editors > $max_editor_in_buf) {
- try_flush($p, 1);
- }
-
- ++$current_editors;
- $ra->replay($changeset, 0, 1,# SVK::Editor->new(_debug=>1));
- SVK::Editor::Serialize->new({ cb_serialize_entry =>
- sub { entry(@_); try_flush($p) } }));
- entry([undef, 'close_edit']);
- try_flush($p);
- }
-
- while ($#{$buf} >= 0) {
- try_flush($p, 1) ;
- }
- exit;
-}
-
-
-
1;
More information about the svk-commit
mailing list