package Sprocket::Plugin::HTTP; use Sprocket qw( Plugin AIO ); use base 'Sprocket::Plugin'; use POE qw( Filter::HTTPD Filter::Stream Wheel::ReadWrite Driver::SysRW ); use HTTP::Response; use HTTP::Date; use HTTP::Status qw( is_info RC_BAD_REQUEST ); use Time::HiRes qw( time ); use MIME::Types; use Scalar::Util qw( blessed ); use bytes; use strict; use warnings; sub OK() { 1 } sub DEFER() { 0 } sub BAD() { undef } our %simple_responses = ( 403 => 'Forbidden', 404 => 'The requested URL was not found on this server.', 500 => 'A server error occurred', ); our $GzipError; BEGIN { eval 'use IO::Compress::Gzip qw( gzip $GzipError );'; eval 'sub HAS_GZIP() { '.( $@ ? 0 : 1 ).' }'; } sub new { shift->SUPER::new( mime => MIME::Types->new(), gzip_enabled => 1, min_gzip_size => 200, # bytes send_file_backoff => .3, @_ ); } sub import { my $self = shift; my $package = caller(); my @exports = qw( OK DEFER BAD ); push( @exports, @_ ) if ( @_ ); no strict 'refs'; foreach my $sub ( @exports ) { *{ $package . '::' . $sub } = \&$sub; } } # the default local_accept behavior is to accept sub local_connected { my ( $self, $server, $con, $socket ) = @_; $self->take_connection( $con ); # POE::Filter::Stackable object: $con->filter->push( POE::Filter::HTTPD->new( headers_only => 1 ) ); # cut laggers off $con->set_time_out( 5 ); } sub start_http_request { my ( $self, $server, $con, $req ) = @_; my $x = $con->x; if ( $x->{_send_file_running} ) { # NO pipelining during a send_file $con->close( 1 ); return undef; } delete $x->{_close}; $x->{_start_time} = time() unless ( $x->{_start_time} ); if ( blessed( $req ) ) { $con->pause(); # no requests until we're ready if ( $req->isa( 'HTTP::Response' ) ) { $x->{_r} ||= $req; # a prebuilt http::response $x->{_req} ||= HTTP::Request->new(); # $x->{_close} = 1; $con->call( 'finish' ); return DEFER; } elsif ( $req->isa( 'HTTP::Request' ) ) { $x->{_req} ||= $req; # can continue request handling return OK; } } $server->_log( v => 2, msg => "request isn't an HTTP object: $req" ); $x->{_r} = HTTP::Response->new( 200 ); $x->{_close} = 1; $con->call( finish => 'invalid request' ); return BAD; } sub send_file { my ( $self, $server, $con, $file, $fh, $file_size ) = @_; my $r = $self->_set_headers( $con ); $r->header( "Content-Length" => $file_size ); # warn "going to send: $file_size bytes, $file"; $con->send( $r ); $con->x->{_send_file_progress} = 1; # this will start the transfer after the header is flushed $con->x->{flush_callback} = sub { # wait for the last flush return if ( $con->get_driver_out_octets ); # no more notices $con->watch_flush( 0 ); $con->x->{__saved_timeout} = $con->time_out; $con->time_out( undef ); # we need to know about disconnects while sending $con->resume(); # TODO ranges # can $size be too big for sendfile? # XXX use the constant from POE::Wheel::ReadWrite? aio_sendfile( $con->wheel->[ 0 ], $fh, 0, $file_size, $con->callback( send_file_complete => $file => $fh => $file_size => 0 ) ); delete $con->x->{flush_callback}; }; $con->watch_flush( 1 ); } sub local_flushed { my ( $self, $server, $con ) = @_; # XXX flush callback hack my $cb = $con->x->{flush_callback}; $cb->() if ( $cb ); } sub send_file_complete { my ( $self, $server, $con, $file, $fh, $file_size, $total, $sent ) = @_; # be safe unless ( $con->connected ) { $con->pause(); $con->close( 1 ); } $total += $sent if ( $sent != -1 ); if ( $file_size > $total ) { # TODO read_ahead #warn "short write, continuing at $total, to ".( $file_size - $total )." file total: $file_size"; # not quite complete # return aio_sendfile( $con->wheel->[ 0 ], $fh, $total, ( $file_size - $total ), # $con->postback( send_file_complete => $file => $fh => $file_size => $total ) ); # delay sets active # delay sendfile to reduce cpu usage return $con->delay_set( _send_file_backoff => $self->{send_file_backoff} => [ $con->wheel->[ 0 ], $fh, $total, ( $file_size - $total ), $con->postback( send_file_complete => $file => $fh => $file_size => $total ) ] ); } my $x = $con->x; if ( $x->{_close} ) { $con->pause(); # no more requests $con->close(); } else { $con->time_out( $x->{__saved_timeout} ) if ( $x->{__saved_timeout} ); $x->{__requests}++; delete $x->{_send_file_progress}; $con->resume(); } $server->_log(v => 1, msg => join( ' ', ( $x->{_req} ? $x->{_req}->protocol : '?' ), ( $x->{_r} ? $x->{_r}->code : '?' ), ( $x->{_start_time} ? ( time() - $x->{_start_time} ) : '?' ), ( defined $sent ? $sent : '-' ), ( $x->{_req} ? $x->{_req}->uri : '?' ), ( $x->{_uri} ? "[$x->{_uri}]" : "" ), )); } sub _send_file_backoff { my ( $self, $server, $con, $args ) = @_; $con->active(); # AIO's prototype forces me to do this return aio_sendfile( $args->[ 0 ], $args->[ 1 ], $args->[ 2 ], $args->[ 3 ], $args->[ 4 ] ); } sub simple_response { my ( $self, $server, $con, $code, $extra ) = @_; $code ||= '000'; # XXX do something else with status? my $status = HTTP::Status::status_message( $code ) || 'Unknown Error'; my $r = $con->x->{_r} ||= HTTP::Response->new(); $r->code( $code ); if ( $code == 301 || $code == 302 ) { $r->header( Location => $extra || '/' ); return $con->call( finish => '' ); } elsif ( $code == 304 || is_info( $code ) ) { return $con->call( finish => '' ); } my $body = $simple_responses{ $code } || $status; $body .= '

'.$extra if ( defined $extra ); $r->content_type( 'text/html' ); $con->call( finish => qq| $code $status

$status

$body
Sprocket v$Sprocket::VERSION POE+Perl Server
| ); } sub finish { my ( $self, $server, $con, $out, $size ) = @_; my $r = $self->_set_headers( $con ); my $x = $con->x; # TODO # in request: # TODO Keep-Alive: 300 my $meth = $x->{_req}->method; my $head = ( $meth && $meth eq 'HEAD' ) ? 1 : 0; if ( defined( $out ) ) { if ( ref( $out ) && ref( $out ) eq 'SCALAR' ) { # must pass size if passing scalar ref $r->content( $$out ) unless $head; } else { $size = length( $out ) unless ( $size ); $r->content( $out ) unless $head; } $r->header( 'Content-Length' => $size ); } ENC: { if ( HAS_GZIP && !$head && defined $size && $self->{gzip_enabled} && $size >= $self->{min_gzip_size} ) { my $enc = $x->{_req}->header( 'accept-encoding' ); last unless ( $enc ); my %accepts = map { $_ => 1 } split( ',', $enc ); if ( $accepts{gzip} ) { my $output = ''; gzip( \ $r->content, \$output ) or do { $server->_log( v => 4, msg => 'Error gzipping content, skipping gzip encoding: '.$GzipError ); last ENC; }; my $gsize = length( $output ); if ( $gsize > $size ) { $server->_log( v => 4, msg => 'gzipped content was larger than input, skipping gzip encoding' ); last ENC; } $size = $gsize; $r->content( $output ); $r->header( 'Content-Encoding' => 'gzip' ); $r->header( 'Content-Length' => $size ); } } } # XXX check for content length if keep-alive? if ( $x->{_close} ) { $con->pause(); # no more requests $con->send( $r ); $con->close(); } else { # TODO set/reset timeout $con->send( $r ); $x->{__requests}++; $con->resume(); } return $server->_log(v => 1, msg => '400 bad request' ) if ( $r->code == 400 ); # TODO log full request` $server->_log(v => 1, msg => join( ' ', ( $x->{_req} && $x->{_req}->protocol ? $x->{_req}->protocol : '?' ), ( $r->code ? $r->code : '?' ), ( $r->header( 'X-Time-To-Serve' ) ? sprintf( '%.5g', $r->header( 'X-Time-To-Serve' ) ) : '?' ), ( defined $size ? $size : '-' ), ( $x->{_req} ? $x->{_req}->uri : '?' ), ( $x->{_uri} ? "[$x->{_uri}]" : "" ), ( $r->code && $r->code == 302 ? $r->header( 'Location' ) : '' ) )); return; } # private methods # backward compat # TODO deprecate *resolve_path = *_resolve_path; sub _resolve_path { my $self = shift; my $path = shift || ''; $path =~ s/\0//g; my $cwd = '/'; my $path_out = ''; if ($path eq '') { $path_out = $cwd; } elsif ($path eq '/') { $path_out = '/'; } else { my @real = split( m!/!, $cwd ); if ( $path =~ m!^/! ) { undef @real; } foreach ( split( m!/!, $path ) ) { if ( $_ eq '..' ) { pop( @real ) if ( $#real ); } elsif ( $_ eq '.' ) { next; } elsif ( $_ eq '~' ) { if ( $self->{home_path} ) { @real = split( m!/!, $self->{home_path} ); } else { next; } } else { push( @real, $_ ); } } $path_out = join( '/', @real ); } $path_out = ( $path_out =~ m!^/! ) ? $path_out : '/'.$path_out; $path_out .= ( $path_out =~ m!/$! ) ? '' : '/' if ( $path =~ m!/$! ); return $path_out; } sub _set_headers { my ( $self, $con ) = @_; my $time = time(); my $x = $con->x; my $r = $x->{_r} || HTTP::Response->new(); my $code = $r->code || "000"; # XXX WTF where is the space coming from? [500 ] $code =~ s/\s*//g; $r->code( $code ); $r->message( HTTP::Status::status_message( $code ) ); $r->header( Server => 'Sprocket/'.$Sprocket::VERSION ); $r->header( 'X-Powered-By' => 'Sprocket (http://sprocketframework.com/); ' . 'POE (http://poe.perl.org/); Perl (http://perl.org/)' ); $r->header( 'X-Time-To-Serve' => ( $time - $x->{_start_time} ) ); $r->header( Date => time2str( $time ) ); my $proto; $r->protocol( $proto = $x->{_req}->protocol ); if ( $proto && $proto eq 'HTTP/1.0' ) { unless ( defined( $x->{_close} ) ) { my $connection = $x->{_req}->header( 'connection' ); if ( $connection && $connection =~ m/^keep-alive$/i ) { $x->{_close} = 0; $r->header( 'Connection' => 'keep-alive' ); } else { $x->{_close} = 1; } } } elsif ( $proto && $proto eq 'HTTP/1.1' ) { # XXX # in 1.1, keep-alive is assumed # unless a plugin has set _close $x->{_close} = 0 unless ( $x->{_close} ); } else { # XXX $x->{_close} = 1; } $r->header( 'Connection' => 'close' ) if ( $x->{_close} ); # XXX max requests, not needed # $x->{_close} = 1 if ( $x->{__requests} && $x->{__requests} > 100 ); if ( $con->can( 'clid' ) ) { if ( my $clid = $con->clid ) { $r->header( 'X-Client-ID' => $clid ); } } $r->header( 'X-Sprocket-CID' => $con->ID ); return $r; } 1;