[svk-commit] r2120 - in branches/mirror-pipeline: .

clkao at bestpractical.com clkao at bestpractical.com
Mon Nov 6 04:14:16 EST 2006


Author: clkao
Date: Mon Nov  6 04:14:15 2006
New Revision: 2120

Modified:
   branches/mirror-pipeline/   (props changed)
   branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm
   branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm

Log:
 r7226 at ubuntu:  clkao | 2006-11-06 05:05:56 +0000
 cleanup and misc docs.


Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm	(original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRa.pm	Mon Nov  6 04:14:15 2006
@@ -459,6 +459,10 @@
     die $@ if $@;
 }
 
+=item sync_changeset($changeset, $metadata, $ra, $extra_prop, $callback )
+
+=cut
+
 sub sync_changeset {
     my ( $self, $changeset, $metadata, $ra, $extra_prop, $callback ) = @_;
     my $t = $self->mirror->get_svkpath;

Modified: branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm
==============================================================================
--- branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm	(original)
+++ branches/mirror-pipeline/lib/SVK/Mirror/Backend/SVNRaPipe.pm	Mon Nov  6 04:14:15 2006
@@ -6,11 +6,12 @@
 
 use POSIX 'EPIPE';
 use Socket;
-
+use Storable qw(nfreeze thaw);
+use SVK::Editor::Serialize;
 
 =head1 NAME
 
-SVK::Mirror::Backend::SVNRaPipe - 
+SVK::Mirror::Backend::SVNRaPipe - Transparent SVN::Ra requests pipelining
 
 =head1 SYNOPSIS
 
@@ -27,58 +28,6 @@
 
 =cut
 
-sub entry {
-    my ($self, $entry) = @_;
-    push @{$self->buf_call}, $entry;
-}
-
-sub try_flush {
-    my $self = shift;
-    my $wait = shift;
-    my $max_write = $wait ? -1 : 10;
-    if ($wait) {
-	$self->fh->blocking(1);
-    }
-    else {
-	$self->fh->blocking(0);
-	my $wstate = '';
-	vec($wstate,fileno($self->fh),1) = 1;
-	select(undef, $wstate, undef, 0);;
-	return unless vec($wstate,fileno($self->fh),1);
-
-    }
-    my $i = 0;
-    my $buf = $self->buf_call;
-    while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
-	if (my $len = length $self->unsent_buf) {
-	    if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
-		substr($self->{unsent_buf}, 0, $ret, '');
-		last if $ret != $len;
-	    }
-	    else {
-		die if $! == EPIPE;
-		return;
-	    }
-	}
-	last if $#{$buf} < 0;
-	my $msg = nfreeze($buf->[0]);
-	$msg = pack('N', length($msg)).$msg;
-
-	if (my $ret = syswrite($self->fh, $msg)) {
-	    $self->{unsent_buf} .= substr($msg, $ret)  if length($msg) != $ret;
-	    if ((shift @$buf)->[1] eq 'close_edit') {
-		--$self->{current_editors} ;
-	    }
-	}
-	else {
-	    die if $! == EPIPE;
-	    # XXX: check $! for fatal
-	    last;
-	}
-    }
-}
-
-
 sub new {
     my ($class, $ra , $gen) = @_;
 
@@ -108,28 +57,28 @@
 
     $self->fh($p);
     $File::Temp::KEEP_ALL = 1;
-    # Begin external process for buffered ra requests.
+    # Begin external process for buffered ra requests and send response to parent.
     my $max_editor_in_buf = 5;
     my $pool = SVN::Pool->new_default;
     while (my $req = $gen->()) {
 	$pool->clear;
 	my ($cmd, @arg) = @$req;
-	require SVK::Editor::Serialize;
 	@arg = map { $_ eq 'EDITOR' ? SVK::Editor::Serialize->new({ cb_serialize_entry =>
-								    sub { $self->entry(@_); $self->try_flush } })
+								    sub { $self->_enqueue(@_); $self->try_flush } })
 			            : $_ } @arg;
 
+	# Note that we might want to switch to bandwidth based buffering,
 	while ($self->current_editors > $max_editor_in_buf) {
 	    $self->try_flush(1);
 	}
 
 	my $ret = $self->ra->$cmd(@arg);
-	if ($cmd eq 'replay') { # XXX or other requests using editors
+	if ($cmd eq 'replay') { # XXX support other requests using editors
 	    ++$self->{current_editors};
-	    $self->entry([undef, 'close_edit']);
+	    $self->_enqueue([undef, 'close_edit']);
 	}
 	else {
-	    $self->entry([$ret, $cmd]);
+	    $self->_enqueue([$ret, $cmd]);
 	}
 	$self->try_flush();
     }
@@ -140,9 +89,57 @@
     exit;
 }
 
-# Client code reading pipelined responses
+sub _enqueue {
+    my ($self, $entry) = @_;
+    push @{$self->buf_call}, $entry;
+}
 
-use Storable qw(nfreeze thaw);
+sub try_flush {
+    my $self = shift;
+    my $wait = shift;
+    my $max_write = $wait ? -1 : 10;
+    if ($wait) {
+	$self->fh->blocking(1);
+    }
+    else {
+	$self->fh->blocking(0);
+	my $wstate = '';
+	vec($wstate,fileno($self->fh),1) = 1;
+	select(undef, $wstate, undef, 0);;
+	return unless vec($wstate,fileno($self->fh),1);
+    }
+    my $i = 0;
+    my $buf = $self->buf_call;
+    while ( $#{$buf} >= 0 || length($self->unsent_buf) ) {
+	if (my $len = length $self->unsent_buf) {
+	    if (my $ret = syswrite($self->fh, $self->unsent_buf)) {
+		substr($self->{unsent_buf}, 0, $ret, '');
+		last if $ret != $len;
+	    }
+	    else {
+		die if $! == EPIPE;
+		return;
+	    }
+	}
+	last if $#{$buf} < 0;
+	my $msg = nfreeze($buf->[0]);
+	$msg = pack('N', length($msg)).$msg;
+
+	if (my $ret = syswrite($self->fh, $msg)) {
+	    $self->{unsent_buf} .= substr($msg, $ret)  if length($msg) != $ret;
+	    if ((shift @$buf)->[1] eq 'close_edit') {
+		--$self->{current_editors} ;
+	    }
+	}
+	else {
+	    die if $! == EPIPE;
+	    # XXX: check $! for fatal
+	    last;
+	}
+    }
+}
+
+# Client code reading pipelined responses
 
 sub read_msg {
     my $self = shift;
@@ -173,7 +170,7 @@
     $self->ensure_client_cmd('rev_proplist', @_);
     # read synchronous msg
     my $data = thaw( ${$self->read_msg} );
-    # XXX: ensure $data->[1] is rev_proplist
+    die 'inconsistent response' unless $data->[1] eq 'rev_proplist';
     return $data->[0];
 }
 
@@ -223,8 +220,11 @@
 	}
     }
     else {
-	return if $func eq 'close_edit';
-	$ret = $editor->$func(@arg, $pool);
+	# do not emit the fabricated close_edit, as replay doesn't
+	# give us that.  We need that in the stream so the client code
+	# of replay knows the end of response has reached.
+	$ret = $editor->$func(@arg, $pool)
+	    unless $func eq 'close_edit';
     }
     return $ret;
 }


More information about the svk-commit mailing list