#!/usr/bin/perl -w # dbiserver.pl use strict; use DBI; use IO::File; use IO::Socket; use IO::Select; use Data::Dumper; # for debug # ------------------------------------------------------------------------------ # Constants # server socket pathname my $dbiserver_socket_pathname = "/tmp/dbiserver.sock"; # types my $TYPE1_GENERAL = 0; my $TYPE_CONNECT = 0x0000; my $TYPE1_DATABASE = 1; my $TYPE_DB_DISCONNECT = 0x0100; my $TYPE_DB_TABLES = 0x0101; my $TYPE_DB_TEST_TABLE = 0x0102; my $TYPE_DB_PREPARE = 0x0103; my $TYPE_DB_DO = 0x0104; my $TYPE_DB_SELECTROW = 0x0105; my $TYPE_DB_SELECTALL = 0x0106; my $TYPE_DB_SELECTCOL = 0x0107; my $TYPE1_STATEMENT = 2; my $TYPE_ST_EXECUTE = 0x0200; my $TYPE_ST_FETCHROW = 0x0201; my $TYPE_ST_FINISH = 0x0202; my $TYPE_ST_FETCHALL = 0x0203; # error codes - 0 is OKAY my $ERROR_UNKNOWN_TYPE = 1; my $ERROR_DBI_FAIL = 2; my $ERROR_INVALID_HANDLE = 3; my $ERROR_TOO_MANY_HANDLES = 4; my $ERROR_SYNTAX = 5; # ------------------------------------------------------------------------------ # Globals my $running = 1; # I've made these 'global' for convenience, so I can use subs without complex / inefficient parameters my ($client, $request, $reply, $status); # State information per client my %client = (); # State information per database my %database = (); # ------------------------------------------------------------------------------ # Signals $SIG{PIPE} = sub { print STDERR "SIGPIPE - ignored\n"; }; $SIG{INT} = sub { print STDERR "SIGINT - terminating\n"; $running = 0; }; $SIG{__DIE__} = sub { print STDERR "DIE : @_ - terminating\n"; exit 0; }; $SIG{__WARN__} = sub { print STDERR "WARN : @_ - ignored\n"; }; # ------------------------------------------------------------------------------ # Select my $select = IO::Select->new(); # Listening socket unlink $dbiserver_socket_pathname; my $listen_socket = IO::Socket::UNIX->new(Local => $dbiserver_socket_pathname, Listen => 1) || die "Can't connect to $dbiserver_socket_pathname for listen"; chmod 0777, $dbiserver_socket_pathname; $select->add($listen_socket); # MAIN SELECT LOOP while ($running) { # wait for something to happen my @handles = $select->can_read(undef); for my $handle (@handles) { if ($handle eq $listen_socket) { # it's the listening socket - accept a new client my $client_socket = $handle->accept; my $fileno = $client_socket->fileno; $select->add($client_socket); # prepare per-client state for this client $client = $client{$client_socket} = { fileno=>$fileno, request=>'', dbhs=>[], sths=>[], st_by_db=>{} }; message("Accepting a new client"); } else { # client is sending us a request $client = $client{$handle}; my $read_len = $handle->sysread($client->{request}, 63*1024, length($client->{request})); if (not defined $read_len) { # read error message("Error reading from client: $!"); } elsif ($read_len == 0) { # client has closed connection message("Client closed connection"); # close all client databases (this takes care of statements also) my $dbhs = $client->{dbhs}; for my $db_index (0.. $#$dbhs) { close_client_database($db_index) if defined $dbhs->[$db_index]; } message(" Buffer was not empty:\n$client->{request}") unless $client->{request} eq ''; $select->remove($handle); delete $client{$handle}; } else { # we have at least the length field (first 2 bytes) for (;;) { # it may be that more than one request has been received my $req_len_field = unpack('N', $client->{request}); my $req_len = length($client->{request}); if (not defined $req_len_field or $req_len < $req_len_field) { # we wait for more last; } else { # we have the whole request - possibly more! # update the buffer with any excess, and chop that off the request $request = substr $client->{request}, 0, $req_len_field, ''; &process_request; # send the reply - this should autoflush $handle->print($reply); # if there are more requests pending, last if $client->{request} eq ''; } } } } } # for $handle (@handles) } # ------------------------------------------------------------------------------ # Get rid of the socket unlink $dbiserver_socket_pathname; # ------------------------------------------------------------------------------ # Process a client's request sub process_request { $reply = ''; $status = 0; # chop off the length field substr $request, 0, 4, ''; # get the common fields, type and client_ref my ($type, $client_ref) = unpack 'nn', substr $request, 0, 4, ''; my $type1 = $type >> 8; if ($type1 == $TYPE1_GENERAL) { # it's a general command if ($type == $TYPE_CONNECT) { process_connect($request); } else { reply_error($ERROR_UNKNOWN_TYPE); } } elsif ($type1 == $TYPE1_DATABASE) { # it's a database handle command my $db_index = unpack 'n', substr($request, 0, 2, ''); if (not defined $db_index) { reply_error($ERROR_INVALID_HANDLE); } else { my $dbh = $client->{dbhs}[$db_index]; if (not defined $dbh) { reply_error($ERROR_INVALID_HANDLE); } else { if ($type == $TYPE_DB_DISCONNECT) { process_db_disconnect($dbh, $db_index); } elsif ($type == $TYPE_DB_TABLES) { process_db_tables($dbh); } elsif ($type == $TYPE_DB_TEST_TABLE) { process_db_test_table($dbh); } elsif ($type == $TYPE_DB_PREPARE) { process_db_prepare($dbh, $db_index); } elsif ($type == $TYPE_DB_DO) { process_db_do($dbh); } elsif ($type == $TYPE_DB_SELECTROW) { process_db_selectrow($dbh); } elsif ($type == $TYPE_DB_SELECTALL) { process_db_selectall($dbh); } elsif ($type == $TYPE_DB_SELECTCOL) { process_db_selectcol($dbh); } else { reply_error($ERROR_UNKNOWN_TYPE); } } } } elsif ($type1 == $TYPE1_STATEMENT) { # it's a statement handle command my $st_index = unpack 'n', substr($request, 0, 2, ''); if (not defined $st_index) { reply_error($ERROR_INVALID_HANDLE); } else { my $sth = $client->{sths}[$st_index]; if (not defined $sth) { reply_error($ERROR_INVALID_HANDLE); } else { if ($type == $TYPE_ST_EXECUTE) { process_st_execute($sth); } elsif ($type == $TYPE_ST_FETCHROW) { process_st_fetchrow($sth); } elsif ($type == $TYPE_ST_FINISH) { process_st_finish($sth, $st_index); } elsif ($type == $TYPE_ST_FETCHALL) { process_st_fetchall($sth); } else { reply_error($ERROR_UNKNOWN_TYPE); } } } } else { reply_error($ERROR_UNKNOWN_TYPE); } $reply = (pack 'Nnnn', length($reply) + 10, $type, $client_ref, $status) . $reply; } sub reply_error { message("Sending error to client: @_"); $status = shift; $reply = shift || ''; } sub process_connect { my $dbh; message("Connecting to database: $request"); # try to connect - but DON'T DIE!!! # note: this doesn't support username / passwd yet eval { local ($SIG{__DIE__}, $SIG{__WARN__}); $dbh = DBI->connect("dbi:$request"); };#, "", "", { PrintErrors => 0 }) }; unless ($dbh) { reply_error($ERROR_DBI_FAIL, DBI->errstr || $@); return; } my $databases = $client->{dbhs}; # $db_index is our database handle, just an index into the array of databases for this client my $db_index = 0; ++$db_index while defined $databases->[$db_index]; if ($db_index > 0xFFFF) { # too many database connections reply_error($ERROR_TOO_MANY_HANDLES); return; } $databases->[$db_index] = $dbh; $reply = pack 'n', $db_index; # we return the id of this database } sub process_db_disconnect { my ($dbh, $db_index) = @_; if ($request ne '') { reply_error($ERROR_SYNTAX); return; } message("Disconnecting from database #$db_index"); unless (close_client_database($db_index)) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } } sub close_client_database { my $db_index = shift; my $okay = 1; # finish all client statements for this database my $st_indexes = $client->{st_by_db}{$db_index}; for (keys %$st_indexes) { $okay &&= finish_client_statement($_) } # close the database my $dbhs = $client->{dbhs}; my $dbh = $dbhs->[$db_index]; $okay &&= $dbh->disconnect; if ($db_index == $#$dbhs) { pop @$dbhs; } else { undef $dbhs->[$db_index]; } delete $client->{st_by_db}{$db_index}; return $okay; } sub process_db_tables { my $dbh = shift; if ($request ne '') { reply_error($ERROR_SYNTAX); return; } message("Listing Tables"); $reply = pack_list([$dbh->tables]); } # pack a 'binary protocol list' sub pack_list { my $list = shift; my $packed = pack 'N', scalar @$list; for my $item (@$list) { $packed .= defined $item ? (pack 'N', length $item) . $item : pack 'N', 0xFFFFFFFF; } return $packed; } # unpack a 'binary protocol' list # this will return undef if there is a protocol error sub unpack_list { my $packed = shift; my $n = unpack 'N', substr $packed, 0, 4, ''; # return unless defined $n; my @list = (); while ($packed) { my $len = unpack 'N', substr $packed, 0, 4, ''; # return unless defined $len and $len <= length($packed); push @list, ($len == 0xFFFFFFFF) ? undef : substr $packed, 0, $len, ''; } # return unless $n == @list; return \@list;#wantarray ? @list : \@list; } sub process_db_test_table { my $dbh = shift; message("Testing for table '$request'"); $reply = "\0"; for my $table ($dbh->tables) { if ($table eq $request) { $reply = "\1"; last } } } sub process_db_prepare { my ($dbh, $db_index) = @_; message("Preparing statement"); my $sth = $dbh->prepare($request); unless ($sth) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } my $sths = $client->{sths}; my $st_by_db = $client->{st_by_db}; # $st_index is our statement handle, just an index into the array of statements for this client my $st_index = 0; ++$st_index while defined $sths->[$st_index]; if ($st_index > 0xFFFF) { # too many statements reply_error($ERROR_TOO_MANY_HANDLES); return; } $sths->[$st_index] = $sth; $st_by_db->{$db_index}{$st_index} = 1; $reply = pack 'n', $st_index; # we return the id of this statement } sub process_db_do { my ($dbh) = @_; message("Doing command"); my $sql_len = unpack 'n', substr $request, 0, 2, ''; my $sql = substr $request, 0, $sql_len, ''; my $bind = length($request) ? unpack_list($request) : []; unless ($bind) { reply_error($ERROR_SYNTAX); return; } my $rv; eval { local ($SIG{__DIE__}, $SIG{__WARN__}); $rv = $dbh->do($sql, {}, @$bind) }; unless ($rv) { reply_error($ERROR_DBI_FAIL, DBI->errstr || $@); return; } } sub process_db_selectrow { my ($dbh) = @_; message("Selecting Row"); my $sql_len = unpack 'n', substr $request, 0, 2, ''; my $sql = substr $request, 0, $sql_len, ''; my $bind = length($request) ? unpack_list($request) : []; unless ($bind) { reply_error($ERROR_SYNTAX); return; } my @row; eval { local ($SIG{__DIE__}, $SIG{__WARN__}); @row = $dbh->selectrow_array($sql, {}, @$bind); }; if (defined $dbh->err) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } $reply = pack_list([@row]); } sub process_db_selectall { my ($dbh) = @_; message("Selecting All Rows"); my $sql_len = unpack 'n', substr $request, 0, 2, ''; my $sql = substr $request, 0, $sql_len, ''; my $bind = length($request) ? unpack_list($request) : []; unless ($bind) { reply_error($ERROR_SYNTAX); return; } my $rows; eval { local ($SIG{__DIE__}, $SIG{__WARN__}); $rows = $dbh->selectall_arrayref($sql, {}, @$bind); }; unless ($rows) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } $reply = pack_list([map {pack_list($_)} @$rows]); } sub process_db_selectcol { my ($dbh) = @_; message("Selecting Column"); my $sql_len = unpack 'n', substr $request, 0, 2, ''; my $sql = substr $request, 0, $sql_len, ''; my $bind = length($request) ? unpack_list($request) : []; unless ($bind) { reply_error($ERROR_SYNTAX); return; } my $col; eval { local ($SIG{__DIE__}, $SIG{__WARN__}); $col = $dbh->selectcol_arrayref($sql, {}, @$bind); }; unless ($col) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } $reply = pack_list($col); } sub process_st_execute { my $sth = shift; my $bind = unpack_list($request); message("Executing statement"); unless ($bind) { reply_error($ERROR_SYNTAX); return; } # try to execute - but DON'T DIE!!! my $rv; eval { local ($SIG{__DIE__}, $SIG{__WARN__}); $rv = $sth->execute(@$bind) }; unless ($rv) { reply_error($ERROR_DBI_FAIL, DBI->errstr || $@); return; } $reply = (pack 'N', $sth->rows) . pack_list($sth->{NAME}); } sub process_st_fetchrow { my $sth = shift; message("Fetching row"); my $row = $sth->fetchrow_arrayref; if (defined $sth->err) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } $reply = pack_list(defined $row ? $row : []); } sub process_st_finish { my ($sth, $st_index) = @_; if ($request ne '') { reply_error($ERROR_SYNTAX); return; } message("Finish statement #$st_index"); unless (finish_client_statement($st_index)) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } } sub process_st_fetchall { my $sth = shift; message("Fetching all rows"); my $rows = $sth->fetchall_arrayref; unless ($rows) { reply_error($ERROR_DBI_FAIL, DBI->errstr); return; } $reply = pack_list([map {pack_list($_)} @$rows]); } sub finish_client_statement { my $st_index = shift; my $sths = $client->{sths}; my $sth = $sths->[$st_index]; my $ret = $sth->finish; if ($st_index == $#$sths) { pop @$sths; } else { undef $sths->[$st_index]; } # get rid of the entry in st_by_db my $st_by_db = $client->{st_by_db}; # we try to delete it from each databases statements while (my ($db_index, $h_st_index) = each %$st_by_db) { delete $h_st_index->{$st_index}; } return $ret; } sub message { # print $client->{fileno}, ": ", shift, "\n"; }