package POE::Component::Client::Asterisk::Manager; ###################################################################### ### POE::Component::Client::Asterisk::Manager ### David Davis (xantus@cpan.org) ### ### Copyright (c) 2003-2005 David Davis and Teknikill. All Rights ### Reserved. This module is free software; you can redistribute it ### and/or modify it under the same terms as Perl itself. ###################################################################### use strict; use warnings; our $VERSION = '0.07'; use Carp qw(croak); use POE qw( Component::Client::TCP ); use Digest::MD5; sub DEBUG { 0 } sub new { my $package = shift; croak "$package requires an even number of parameters" if @_ % 2; my %params = @_; my $alias = $params{'Alias'}; $alias = 'asterisk_client' unless defined($alias) and length($alias); my $listen_port = $params{listen_port} || 5038; POE::Session->create( #options => {trace=>1}, args => [ %params ], package_states => [ 'POE::Component::Client::Asterisk::Manager' => { _start => '_start', _stop => '_stop', signals => 'signals', } ], inline_states => $params{inline_states}, # { # _default => sub { # print STDERR "$_[STATE] called\n"; # }, # }, ); return 1; } sub _start { my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP]; my %params = splice(@_,ARG0); if (ref($params{Options}) eq 'HASH') { $session->option( %{$params{Options}} ); } $params{reconnect_time} = $params{reconnect_time} || 5; $params{requeue_posts} = $params{requeue_posts} || undef; $kernel->alias_set($params{Alias}); # watch for SIGINT # $kernel->sig('INT', 'signals'); $heap->{client} = POE::Component::Client::TCP->new( RemoteAddress => $params{RemoteHost}, RemotePort => $params{RemotePort}, # no longer a seperate package - see below Filter => "POE::Filter::Asterisk::Manager", Alias => "$params{Alias}_client", Args => [ \%params ], Started => sub { $_[HEAP]->{params} = $_[ARG0]; }, Disconnected => sub { $_[KERNEL]->delay(reconnect => $_[HEAP]->{params}->{reconnect_time}); }, Connected => sub { my $heap = $_[HEAP]; DEBUG && print STDERR sprintf("connected to %s:%s\n",$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort}); $heap->{_connected} = 0; $heap->{_logged_in} = 0; $heap->{_auth_stage} = 0; $_[KERNEL]->delay( recv_timeout => 5 ); if ($heap->{params}->{Astmanproxy}) { # For astmanproxy? Don't wait for a response $heap->{_connected} = 1; $kernel->call($_[SESSION] => login => splice(@_,ARG0)); } }, ConnectError => sub { my $heap = $_[HEAP]; DEBUG && print STDERR sprintf("could not connect to %s:%s, reconnecting in %s seconds...\n" ,$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort},$heap->{params}->{reconnect_time}); $_[KERNEL]->delay(reconnect => $heap->{params}->{reconnect_time}); }, ServerInput => sub { my ( $kernel, $heap, $input ) = @_[KERNEL, HEAP, ARG0]; DEBUG && do { require Data::Dumper; print Data::Dumper->Dump([$input],['input']); }; if ($heap->{_logged_in} == 0 && $heap->{_connected} == 0) { $kernel->delay( recv_timeout => 5 ); if ($input->{acm_version}) { $heap->{_version} = $input->{acm_version}; $heap->{_connected} = 1; $kernel->call($_[SESSION] => login => splice(@_,ARG0)); } else { print STDERR "Invalid Protocol (wrong port?)\n"; $kernel->yield("shutdown"); } } elsif ($heap->{_connected} == 1 && $heap->{_logged_in} == 0) { $kernel->call($_[SESSION] => login => splice(@_,ARG0)); } elsif ($heap->{_logged_in} == 1) { $kernel->call($_[SESSION] => callbacks => splice(@_,ARG0)); } }, InlineStates => { _put => sub { my $heap = $_[HEAP]; if ($heap->{server}) { $heap->{server}->put($_[ARG0]); } else { if ($heap->{requeue_posts}) { push(@{$heap->{queued}},$_[ARG0]); } else { print STDERR "cannot send when not connected! -ignored-\n"; } } }, login_complete => sub { my ( $kernel, $heap ) = @_[KERNEL, HEAP]; DEBUG && print STDERR "logged in and ready to process events\n"; # call the _connected state $kernel->yield("_connected" => splice(@_,ARG0)); }, recv_timeout => sub { my ( $kernel, $heap ) = @_[KERNEL, HEAP]; unless ($heap->{_connected} == 1) { print STDERR "Timeout waiting for response\n"; $heap->{_connected} = 0; $heap->{_logged_in} = 0; $kernel->yield("shutdown"); } }, login => sub { my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0]; if ($heap->{_logged_in} == 1) { # shouldn't get here DEBUG && print STDERR "Login called when already logged in\n"; #$kernel->yield(callbacks => splice(@_,ARG0)); return; } if ($heap->{_auth_stage} == 0) { $heap->{server}->put({'Action' => 'Challenge', 'AuthType' => 'MD5'}); $heap->{_auth_stage} = 1; } elsif ($heap->{_auth_stage} == 1) { unless ($input->{Response} && lc($input->{Response}) eq 'success') { print STDERR "AuthType MD5 may not be supported\n"; $kernel->yield("shutdown"); return; } if ($input->{Challenge}) { if (! defined $heap->{params}->{Password}) { print STDERR "No password provided\n"; $kernel->yield("shutdown"); return; } my $digest = Digest::MD5::md5_hex("$input->{Challenge}$heap->{params}->{Password}"); $heap->{server}->put({'Action' => 'Login', 'AuthType' => 'MD5', 'Username' => $heap->{params}->{Username}, 'Key' => $digest }); $heap->{_auth_stage} = 2; } } elsif ($heap->{_auth_stage} == 2) { if ($input->{Message} && lc($input->{Message}) eq 'authentication accepted') { delete $heap->{_auth_stage}; $heap->{_logged_in} = 1; # I remembered inline_states not working (above), so i put this in foreach my $k (keys %{$heap->{params}->{inline_states}}) { $kernel->state($k => $heap->{params}->{inline_states}{$k}); } if (ref($heap->{queued}) eq 'ARRAY') { foreach my $a (@{$heap->{queued}}) { $heap->{server}->put($a); } delete $heap->{queued}; } $kernel->yield(login_complete => splice(@_,ARG0)); } elsif ($input->{Message} && lc($input->{Message}) eq 'authentication failed') { print STDERR "Authentication failed.\n"; $heap->{_connected} = 0; $heap->{_logged_in} = 0; $kernel->yield("shutdown"); } } }, callbacks => sub { my ($kernel, $heap, $session, $input) = @_[KERNEL, HEAP, SESSION, ARG0]; # TODO this stuff needs some work next unless (ref($input)); my $qual = 0; foreach my $k (keys %{$heap->{params}->{CallBacks}}) { my $match = 0; if (ref($heap->{params}->{CallBacks}{$k}) eq 'HASH') { foreach my $c (keys %{$heap->{params}->{CallBacks}{$k}}) { last if ($match == 1); if (exists($input->{$c}) && $heap->{params}->{CallBacks}{$k}{$c} eq $input->{$c}) { $match = 2; $qual++; } else { $match = 1; } } # matched ALL of the callback (not 2 of them like it looks like) if ($match == 2) { # callback good DEBUG && print STDERR "callback $k is good\n"; $kernel->yield($k => $input); } } elsif ($heap->{params}->{CallBacks}{$k} eq ':all' || $heap->{params}->{CallBacks}{$k} eq 'default') { $kernel->yield($k => $input); } else { print STDERR "Incorrectly written callback $k\n"; } } # use the :all qualifier now #if ($qual == 0) { # $kernel->yield("default" => splice(@_,ARG0)); #} }, }, ); DEBUG && print STDERR "Client started.\n"; } sub _stop { $_[KERNEL]->yield("shutdown"); DEBUG && print STDERR "Client stopped.\n"; } # Handle incoming signals (INT) # TODO disconnect gracefully sub signals { my $signal_name = $_[ARG0]; # DEBUG && print STDERR "Client caught SIG$signal_name\n"; # do not handle the signal return 0; } 1; package POE::Filter::Asterisk::Manager; use strict; use Carp qw(croak); sub DEBUG { 0 }; sub new { my $type = shift; my $self = { buffer => '', crlf => "\x0D\x0A", }; bless $self, $type; $self; } sub get { my ($self, $stream) = @_; # Accumulate data in a framing buffer. $self->{buffer} .= join('', @$stream); my $many = []; while (1) { my $input = $self->get_one([]); if ($input) { push(@$many,@$input); } else { last; } } return $many; } sub get_one_start { my ($self, $stream) = @_; DEBUG && do { my $temp = join '', @$stream; $temp = unpack 'H*', $temp; warn "got some raw data: $temp\n"; }; # Accumulate data in a framing buffer. $self->{buffer} .= join('', @$stream); } sub get_one { my $self = shift; return [] if ($self->{finish}); if ($self->{buffer} =~ s#^(?:Asterisk|Aefirion) Call Manager(?: Proxy)?/(\d+\.\d+\w*)$self->{crlf}##is) { return [{ acm_version => $1 }]; } return [] unless ($self->{crlf}); my $crlf = $self->{crlf}; # collect lines in buffer until we find a double line return [] unless($self->{buffer} =~ m/${crlf}${crlf}/s); $self->{buffer} =~ s/(^.*?)(${crlf}${crlf})//s; my $buf = "$1${crlf}"; my $kv = {}; foreach my $line (split(/(:?${crlf})/,$buf)) { my $tmp = $line; $tmp =~ s/\r|\n//g; next unless($tmp); if ($line =~ m/([\w\-]+)\s*:\s+(.*)/) { my $key = $1; my $val = $2; DEBUG && print "recv key $key: $val\n"; if ($key eq 'Variable',) { for my $v( split /\r/, $val ) { $v =~ s/^Variable\:\s*//; my @parts = split /=/, $v; $kv->{$key}{$parts[0]} = $parts[1]; DEBUG && print " recv variable: $parts[0] => $parts[1]\n"; } } else { $kv->{$key} = $val; } } else { $kv->{content} .= "$line"; } } return (keys %$kv) ? [$kv] : []; } sub put { my ($self, $hrefs) = @_; my @raw; for my $i ( 0 .. $#{$hrefs} ) { if (ref($hrefs->[$i]) eq 'HASH') { foreach my $k (keys %{$hrefs->[$i]}) { DEBUG && print "send key $k: $hrefs->[$i]{$k}\n"; push(@raw,"$k: $hrefs->[$i]{$k}$self->{crlf}"); } } elsif (ref($hrefs->[$i]) eq 'ARRAY') { push(@raw, join("$self->{crlf}", @{$hrefs->[$i]}, "")); } elsif (ref($hrefs->[$i]) eq 'SCALAR') { push(@raw, $hrefs->[$i]); } else { croak "unknown type ".ref($hrefs->[$i])." passed to ".__PACKAGE__."->put()"; } push(@raw,"$self->{crlf}"); } \@raw; } sub get_pending { my $self = shift; return [ $self->{buffer} ] if length $self->{buffer}; return undef; } 1; __END__ =head1 NAME POE::Component::Client::Asterisk::Manager - Event-based Asterisk / Aefirion Manager Client =head1 SYNOPSIS use POE qw( Component::Client::Asterisk::Manager ); POE::Component::Client::Asterisk::Manager->new( Alias => 'monitor', RemoteHost => 'localhost', RemotePort => 5038, # default port CallBacks => { input => ':all', # catchall for all manager events ring => { 'Event' => 'Newchannel', 'State' => 'Ring', }, }, inline_states => { input => sub { my $input = $_[ARG0]; # good for figuring out what manager events look like require Data::Dumper; print Data::Dumper->Dump([$input]); }, ring => sub { my $input = $_[ARG0]; # $input is a hash ref with info from print STDERR "RING on channel $input->{Channel}\n"; }, }, ); $poe_kernel->run(); =head1 DESCRIPTION POE::Component::Client::Asterisk::Manager is an event driven Asterisk manager client. This module should also work with Aefirion. =head1 METHODS =head2 new() This method creates a POE::Component::Client::TCP session and works inside that session. You can specify the alias, host, port and inline_states. See the synopsis for an example. =head1 CALLBACKS Callbacks are events that meet a criteria specified in a hash ref For example: ring => { 'Event' => 'Newchannel', 'State' => 'Ring', }, The event 'ring' will be called with a hash href in ARG0 when the component receives a manager event matching 'Newchannel' and manager state 'Ring'. You can specify a catch all event like this: catch_all => ':all' Note: This was changed from 'default' to ':all' in an effort to make it more clear. 'default' will also work. =head1 BUGS Please report them via RT: L =head1 EXAMPLES There are a few examples in the examples directory that can get you going. =head1 AUTHOR David Davis, Exantus@cpan.orgE =head1 SEE ALSO perl(1), POE::Filter::Asterisk::Manager, L, L, L =cut