[svk-commit] r2120 - in branches/mirror-pipeline: .
clkao at bestpractical.com
clkao at bestpractical.com
Mon Nov 6 04:14:16 EST 2006
Author: clkao
Date: Mon Nov 6 04:14:15 2006
New Revision: 2120
Modified:
branches/mirror-pipeline/ (props changed)
branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm
branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm
Log:
r7226 at ubuntu: clkao | 2006-11-06 05:05:56 +0000
cleanup and misc docs.
Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm (original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm Mon Nov 6 04:14:15 2006
@@ -459,6 +459,10 @@
die $@ if $@;
}
+=item sync_changeset($changeset, $metadata, $ra, $extra_prop, $callback )
+
+=cut
+
sub sync_changeset {
my ( $self, $changeset, $metadata, $ra, $extra_prop, $callback ) = @_;
my $t = $self->mirror->get_svkpath;
Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm (original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm Mon Nov 6 04:14:15 2006
@@ -6,11 +6,12 @@
use POSIX 'EPIPE';
use Socket;
-
+use Storable qw(nfreeze thaw);
+use SVK::Editor::Serialize;
=head1 NAME
-SVK::Mirror::Backend::SVNRaPipe -
+SVK::Mirror::Backend::SVNRaPipe - Transparent SVN::Ra requests pipelining
=head1 SYNOPSIS
@@ -27,58 +28,6 @@
=cut
-sub entry {
- my ($self, $entry) = @_;
- push @{$self->buf_call}, $entry;
-}
-
-sub try_flush {
- my $self = shift;
- my $wait = shift;
- my $max_write = $wait ? -1 : 10;
- if ($wait) {
- $self->fh->blocking(1);
- }
- else {
- $self->fh->blocking(0);
- my $wstate = '';
- vec($wstate,fileno($self->fh),1) = 1;
- select(undef, $wstate, undef, 0);;
- return unless vec($wstate,fileno($self->fh),1);
-
- }
- my $i = 0;
- my $buf = $self->buf_call;
- while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
- if (my $len = length $self->unsent_buf) {
- if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
- substr($self->{unsent_buf}, 0, $ret, '');
- last if $ret != $len;
- }
- else {
- die if $! == EPIPE;
- return;
- }
- }
- last if $#{$buf} < 0;
- my $msg = nfreeze($buf->[0]);
- $msg = pack('N', length($msg)).$msg;
-
- if (my $ret = syswrite($self->fh, $msg)) {
- $self->{unsent_buf} .= substr($msg, $ret) if length($msg) != $ret;
- if ((shift @$buf)->[1] eq 'close_edit') {
- --$self->{current_editors} ;
- }
- }
- else {
- die if $! == EPIPE;
- # XXX: check $! for fatal
- last;
- }
- }
-}
-
-
sub new {
my ($class, $ra , $gen) = @_;
@@ -108,28 +57,28 @@
$self->fh($p);
$File::Temp::KEEP_ALL = 1;
- # Begin external process for buffered ra requests.
+ # Begin external process for buffered ra requests and send response to parent.
my $max_editor_in_buf = 5;
my $pool = SVN::Pool->new_default;
while (my $req = $gen->()) {
$pool->clear;
my ($cmd, @arg) = @$req;
- require SVK::Editor::Serialize;
@arg = map { $_ eq 'EDITOR' ? SVK::Editor::Serialize->new({ cb_serialize_entry =>
- sub { $self->entry(@_); $self->try_flush } })
+ sub { $self->_enqueue(@_); $self->try_flush } })
: $_ } @arg;
+ # Note that we might want to switch to bandwidth based buffering,
while ($self->current_editors > $max_editor_in_buf) {
$self->try_flush(1);
}
my $ret = $self->ra->$cmd(@arg);
- if ($cmd eq 'replay') { # XXX or other requests using editors
+ if ($cmd eq 'replay') { # XXX support other requests using editors
++$self->{current_editors};
- $self->entry([undef, 'close_edit']);
+ $self->_enqueue([undef, 'close_edit']);
}
else {
- $self->entry([$ret, $cmd]);
+ $self->_enqueue([$ret, $cmd]);
}
$self->try_flush();
}
@@ -140,9 +89,57 @@
exit;
}
-# Client code reading pipelined responses
+sub _enqueue {
+ my ($self, $entry) = @_;
+ push @{$self->buf_call}, $entry;
+}
-use Storable qw(nfreeze thaw);
+sub try_flush {
+ my $self = shift;
+ my $wait = shift;
+ my $max_write = $wait ? -1 : 10;
+ if ($wait) {
+ $self->fh->blocking(1);
+ }
+ else {
+ $self->fh->blocking(0);
+ my $wstate = '';
+ vec($wstate,fileno($self->fh),1) = 1;
+ select(undef, $wstate, undef, 0);;
+ return unless vec($wstate,fileno($self->fh),1);
+ }
+ my $i = 0;
+ my $buf = $self->buf_call;
+ while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
+ if (my $len = length $self->unsent_buf) {
+ if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
+ substr($self->{unsent_buf}, 0, $ret, '');
+ last if $ret != $len;
+ }
+ else {
+ die if $! == EPIPE;
+ return;
+ }
+ }
+ last if $#{$buf} < 0;
+ my $msg = nfreeze($buf->[0]);
+ $msg = pack('N', length($msg)).$msg;
+
+ if (my $ret = syswrite($self->fh, $msg)) {
+ $self->{unsent_buf} .= substr($msg, $ret) if length($msg) != $ret;
+ if ((shift @$buf)->[1] eq 'close_edit') {
+ --$self->{current_editors} ;
+ }
+ }
+ else {
+ die if $! == EPIPE;
+ # XXX: check $! for fatal
+ last;
+ }
+ }
+}
+
+# Client code reading pipelined responses
sub read_msg {
my $self = shift;
@@ -173,7 +170,7 @@
$self->ensure_client_cmd('rev_proplist', @_);
# read synchronous msg
my $data = thaw( ${$self->read_msg} );
- # XXX: ensure $data->[1] is rev_proplist
+ die 'inconsistent response' unless $data->[1] eq 'rev_proplist';
return $data->[0];
}
@@ -223,8 +220,11 @@
}
}
else {
- return if $func eq 'close_edit';
- $ret = $editor->$func(@arg, $pool);
+ # do not emit the fabricated close_edit, as replay doesn't
+ # give us that. We need that in the stream so the client code
+ # of replay knows the end of response has reached.
+ $ret = $editor->$func(@arg, $pool)
+ unless $func eq 'close_edit';
}
return $ret;
}
More information about the svk-commit
mailing list