[Bps-public-commit] r10936 - in Net-Server-Coro: . lib/Net/Server
alexmv at bestpractical.com
alexmv at bestpractical.com
Tue Feb 26 01:05:20 EST 2008
Author: alexmv
Date: Tue Feb 26 01:05:13 2008
New Revision: 10936
Added:
Net-Server-Coro/lib/Net/Server/Proto/Coro.pm
Removed:
Net-Server-Coro/lib/Net/Server/Proto/SSL.pm
Net-Server-Coro/lib/Net/Server/Proto/TCP.pm
Modified:
Net-Server-Coro/ (props changed)
Net-Server-Coro/lib/Net/Server/Coro.pm
Log:
r28116 at zoq-fot-pik: chmrr | 2008-02-26 01:04:52 -0500
* Do our own SSL handling; IO::Socket::SSL blocks internally during
SSL negotiation.
* Use async_pool for our coro pool, instead of rolling our own
* Don't include all of Net::Server::Proto::* just for the patched
->accept method
Modified: Net-Server-Coro/lib/Net/Server/Coro.pm
==============================================================================
--- Net-Server-Coro/lib/Net/Server/Coro.pm (original)
+++ Net-Server-Coro/lib/Net/Server/Coro.pm Tue Feb 26 01:05:13 2008
@@ -5,70 +5,41 @@
use vars qw($VERSION);
use EV;
use Coro;
-use Coro::Semaphore;
-use Coro::Handle;
-use Coro::Socket;
use base qw(Net::Server);
+use Net::Server::Proto::Coro;
-my $connections = new Coro::Semaphore 500; # $MAX_CONNECTS;
-use vars qw/$SELF @FH @POOL/;
+$VERSION = 0.1;
-no warnings 'redefine';
-
-sub Coro::Handle::accept {
- my ( $peername, $fh );
- while () {
- ( $fh, $peername ) = tied( ${ $_[0] } )->[0]->accept;
- if ($peername) {
- my $socket
- = $_[0]->new_from_fh( $fh,
- forward_class => tied( ${ $_[0] } )->[7] );
- return wantarray ? ( $socket, $peername ) : $socket;
- }
-
- return unless $!{EAGAIN};
-
- $_[0]->readable or return;
- }
-}
+use vars qw/$SELF @FH/;
sub post_bind_hook {
my $self = shift;
my $prop = $self->{server};
- $prop->{sock}
- = [ map { make_coro_socket($_) } @{ $prop->{sock} } ];
+ $prop->{sock} = [ map { make_coro_socket($_) } @{ $prop->{sock} } ];
}
sub make_coro_socket {
my $socket = shift;
- my $proto = $socket->NS_proto;
- $socket = Coro::Socket->new_from_fh( $socket, forward_class => ref $socket );
- $socket->NS_proto($proto);
+ my @extra;
+ if ( $socket->isa("IO::Socket::SSL") ) {
+ $socket = bless $socket, "Net::Server::Proto::TCP";
+ @extra = ( expects_ssl => 1 );
+ }
+ $socket = Net::Server::Proto::Coro->new_from_fh(
+ $socket,
+ forward_class => ref($socket),
+ @extra
+ );
return $socket;
}
-sub get_client_info { }
-
sub handler {
- while (1) {
- my $prop = $SELF->{server};
- my $fh = pop @FH;
- if ($fh) {
- $Coro::current->desc("Active connection");
- $prop->{connected} = 1;
- $prop->{client} = $fh;
- $SELF->run_client_connection;
- last if $SELF->done;
- $prop->{connected} = 0;
- close $fh;
- $connections->up;
- } else {
- last if @POOL >= 20; #$MAX_POOL;
- push @POOL, $Coro::current;
- $Coro::current->desc("Idle connection");
- schedule;
- }
- }
+ my $fh = shift;
+ my $prop = $SELF->{server};
+ $Coro::current->desc("Active connection");
+ $prop->{client} = $fh;
+ $SELF->run_client_connection;
+ close $fh;
}
### prepare for connections
@@ -80,21 +51,15 @@
async {
$Coro::current->desc("Listening on port @{[$socket->sockport]}");
while (1) {
- $connections->down;
my $accepted = scalar $socket->accept;
next unless $accepted;
- push @FH, $accepted;
- if (@POOL) {
- ( pop @POOL )->ready;
- } else {
- async \&handler;
- }
-
+ async_pool \&handler, $accepted;
}
};
}
async {
+
# We want this to be higher priority so it gets timeslices
# when other things cede; this guarantees that we notice
# socket activity and deal.
@@ -105,13 +70,31 @@
# This interrupts the event loop every 10 seconds to force it
# to check if we got sent a SIGINT, for instance. Otherwise
# it would hang until it got an I/O interrupt.
- my $death_notice = EV::timer(10, 10, sub {});
+ my $death_notice = EV::timer( 5, 5, sub { } );
while (1) {
- EV::loop( );
+ EV::loop();
}
};
schedule;
}
+## Due to outstanding patches for Net::Server
+use Net::Server::Proto::TCP;
+
+package Net::Server::Proto::TCP;
+no warnings 'redefine';
+
+sub accept {
+ my $sock = shift;
+ my ( $client, $peername ) = $sock->SUPER::accept();
+
+ ### pass items on
+ if ($peername) {
+ $client->NS_proto( $sock->NS_proto );
+ }
+
+ return wantarray ? ( $client, $peername ) : $client;
+}
+
1;
Added: Net-Server-Coro/lib/Net/Server/Proto/Coro.pm
==============================================================================
--- (empty file)
+++ Net-Server-Coro/lib/Net/Server/Proto/Coro.pm Tue Feb 26 01:05:13 2008
@@ -0,0 +1,268 @@
+use strict;
+use warnings;
+
+package Net::Server::Proto::Coro;
+use base qw/Coro::Socket/;
+
+use Net::SSLeay;
+
+sub new_from_fh {
+ my $class = shift;
+ my $fh = shift or return;
+ my $self = do { local *Coro::Handle };
+
+ tie $self, 'Net::Server::Proto::Coro::FH', fh => $fh, @_;
+
+ bless \$self, ref $class ? ref $class : $class;
+}
+
+sub accept {
+ my $self = shift;
+
+ my $socket = tied( ${$self} )->[0];
+ while (1) {
+ $self->readable or return;
+ my ( $fh, $peername ) = $socket->accept;
+ if ($peername) {
+ my $socket = $self->new_from_fh(
+ $fh,
+ forward_class => tied( ${$self} )->[7],
+ expects_ssl => tied( ${$self} )->[9]
+ );
+ return wantarray ? ( $socket, $peername ) : $socket;
+ }
+
+ return unless $!{EAGAIN};
+ }
+}
+
+sub expects_ssl {
+ my $self = tied ${ $_[0] };
+ $self->[9] = shift if @_;
+ return $self->[9];
+}
+
+sub is_ssl {
+ my $self = tied ${ $_[0] };
+ return $self->[10] ? 1 : 0;
+}
+
+sub start_SSL { Net::Server::Proto::Coro::FH::start_SSL( tied ${$_[0]} ); }
+sub read { Net::Server::Proto::Coro::FH::READ ( tied ${$_[0]}, $_[1], $_[2], $_[3]) }
+sub sysread { Net::Server::Proto::Coro::FH::READ ( tied ${$_[0]}, $_[1], $_[2], $_[3]) }
+sub syswrite { Net::Server::Proto::Coro::FH::WRITE ( tied ${$_[0]}, $_[1], $_[2], $_[3]) }
+sub print { Net::Server::Proto::Coro::FH::WRITE ( tied ${+shift}, join "", @_) }
+sub printf { Net::Server::Proto::Coro::FH::PRINTF ( tied ${+shift}, @_) }
+sub fileno { Net::Server::Proto::Coro::FH::FILENO ( tied ${$_[0]}) }
+sub close { Net::Server::Proto::Coro::FH::CLOSE ( tied ${$_[0]}) }
+
+package Net::Server::Proto::Coro::FH;
+use base qw/Coro::Handle::FH/;
+
+BEGIN {
+ Net::SSLeay::load_error_strings();
+ Net::SSLeay::SSLeay_add_ssl_algorithms();
+ Net::SSLeay::randomize();
+}
+
+sub TIEHANDLE {
+ my ( $class, %arg ) = @_;
+
+ my $self = $class->SUPER::TIEHANDLE(%arg);
+ $self->[9] = $arg{expects_ssl};
+ $self->[10] = undef;
+
+ return $self;
+}
+
+sub READ_UNTIL {
+ my $sub = pop;
+ my $tries = 0;
+ while () {
+
+ # first deplete the read buffer
+ if ( length $_[0][3] ) {
+ my $v = $sub->(@_);
+ return $v if defined $v;
+ }
+
+ return unless Coro::Handle::FH::readable( $_[0] );
+ my $r = Net::SSLeay::read( $_[0][10] );
+ my $errs = Net::SSLeay::print_errs('SSL_read');
+ if ($errs) {
+ warn "SSL Read error: $errs\n";
+ $_[0]->CLOSE;
+ last;
+ }
+
+ if ( defined $r and length $r ) {
+ $_[0][3] .= $r;
+ $tries = 0;
+ } else {
+ if ( ++$tries >= 10 ) {
+ $_[0]->force_close;
+ return;
+ }
+ }
+ }
+}
+
+sub READ {
+ return Coro::Handle::FH::READ(@_) unless $_[0][9];
+ return unless $_[0][10] or $_[0]->start_SSL();
+
+ my $len = $_[2];
+ my $ofs = $_[3];
+ my $stop = sub {
+ my $l = length $_[0][3];
+ if ( $l <= $len ) {
+ substr( $_[1], $ofs ) = $_[0][3];
+ $_[0][3] = "";
+ return $l;
+ } else {
+ substr( $_[1], $ofs ) = substr( $_[0][3], 0, $len );
+ substr( $_[0][3], 0, $len ) = "";
+ return $len;
+ }
+ return undef;
+ };
+
+ READ_UNTIL( @_, $stop );
+}
+
+sub READLINE {
+ return Coro::Handle::FH::READLINE(@_) unless $_[0][9];
+ return unless $_[0][10] or $_[0]->start_SSL();
+
+ my $irs = $_[1] || $/;
+ my $stop = sub {
+ my $pos = index $_[0][3], $irs;
+ if ( $pos >= 0 ) {
+ $pos += length $irs;
+ my $res = substr $_[0][3], 0, $pos;
+ substr( $_[0][3], 0, $pos ) = "";
+ return $res;
+ }
+ return undef;
+ };
+
+ READ_UNTIL( @_, $stop );
+}
+
+sub WRITE {
+ return Coro::Handle::FH::WRITE(@_) unless $_[0][9];
+ return unless $_[0][10] or $_[0]->start_SSL();
+
+ my $len = defined $_[2] ? $_[2] : length $_[1];
+ my $ofs = $_[3] || 0;
+ my $res = 0;
+
+ return unless Coro::Handle::FH::writable( $_[0] );
+ while (1) {
+ my $str = substr( $_[1], $ofs, $len );
+ my $r = Net::SSLeay::write( $_[0][10], $str );
+
+ if ( $r == -1 ) {
+ my $err = Net::SSLeay::get_error( $_[0][10], $r );
+
+ if ( $err == Net::SSLeay::ERROR_WANT_READ() ) {
+ Coro::Handle::FH::readable( $_[0] );
+ } elsif ( $err == Net::SSLeay::ERROR_WANT_WRITE() ) {
+ Coro::Handle::FH::writable( $_[0] );
+ } else {
+ my $errstr = Net::SSLeay::ERR_error_string($err);
+ warn "SSL write error: $err, $errstr\n";
+ $_[0]->force_close;
+ return undef;
+ }
+ } else {
+ $len -= $r;
+ $ofs += $r;
+ $res += $r;
+ return $res unless $len;
+ return unless Coro::Handle::FH::writable( $_[0] );
+ }
+ }
+}
+
+use constant SSL_RECEIVED_SHUTDOWN => 2;
+
+sub CLOSE {
+ return unless @{ $_[0] } and $_[0][0];
+ if ( $_[0][10] ) {
+ my $status = Net::SSLeay::get_shutdown( $_[0][10] );
+ unless ( $status == SSL_RECEIVED_SHUTDOWN ) {
+ local $SIG{PIPE} = sub { };
+ for my $try ( 1, 2 ) {
+ my $rv = Net::SSLeay::shutdown( $_[0][10] );
+ last unless $rv >= 0;
+ }
+ }
+ $_[0]->ssl_free;
+ }
+ my $handle = $_[0][0];
+ Coro::Handle::FH::cleanup(@_);
+ shutdown( $handle, 2 );
+}
+
+sub ssl_free {
+ Net::SSLeay::free( $_[0][10] );
+ $_[0][10] = undef;
+ $_[0][11] = undef;
+}
+
+sub force_close {
+ $_[0]->ssl_free if $_[0][10];
+ $_[0]->CLOSE;
+}
+
+use constant SSL_MODE_ENABLE_PARTIAL_WRITE => 1;
+use constant SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER => 2;
+
+use vars qw/$CONTEXT/;
+
+sub start_SSL {
+ my $ctx;
+
+ unless ($CONTEXT) {
+ $ctx = $CONTEXT = Net::SSLeay::CTX_new;
+ Net::SSLeay::CTX_set_options( $ctx, Net::SSLeay::OP_ALL() );
+ Net::SSLeay::CTX_set_mode( $ctx,
+ SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER
+ | SSL_MODE_ENABLE_PARTIAL_WRITE );
+ Net::SSLeay::CTX_use_PrivateKey_file( $ctx, "certs/server-key.pem",
+ Net::SSLeay::FILETYPE_PEM() );
+ Net::SSLeay::CTX_use_certificate_chain_file( $ctx,
+ "certs/server-cert.pem" );
+ }
+ $ctx = $CONTEXT;
+ $_[0][11] = $ctx;
+
+ my $ssl = Net::SSLeay::new($ctx);
+ Net::SSLeay::set_fd( $ssl, fileno( $_[0][0] ) );
+ $_[0][10] = $ssl;
+
+ while (1) {
+ my $rv = Net::SSLeay::accept($ssl);
+ if ( $rv < 0 ) {
+ my $err = Net::SSLeay::get_error( $ssl, $rv );
+ if ( $err == Net::SSLeay::ERROR_WANT_READ() ) {
+ return unless Coro::Handle::FH::readable( $_[0] );
+ } elsif ( $err == Net::SSLeay::ERROR_WANT_WRITE() ) {
+ return unless Coro::Handle::FH::writable( $_[0] );
+ } else {
+ my $errstr = Net::SSLeay::ERR_error_string($err);
+ warn "SSL accept error: $err, $errstr\n";
+ $_[0]->force_close;
+ return;
+ }
+ } elsif ( $rv == 0 ) {
+ $_[0]->force_close;
+ return;
+ } else {
+ return $_[0];
+ }
+ }
+}
+
+1;
More information about the Bps-public-commit
mailing list