#!/usr/bin/env python # -*- coding: utf-8 -*- # # Author: Andreas Büsching # # generic notifier implementation # # Copyright (C) 2004, 2005, 2006 # Andreas Büsching # # This library is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version # 2.1 as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301 USA """Simple mainloop that watches sockets and timers.""" # python core packages from copy import copy from select import select from select import error as select_error from time import time import os, sys import socket # internal packages import log import dispatch IO_READ = 1 IO_WRITE = 2 IO_EXCEPT = 4 __sockets = {} __sockets[ IO_READ ] = {} __sockets[ IO_WRITE ] = {} __sockets[ IO_EXCEPT ] = {} __timers = {} __timer_id = 0 __min_timer = None __in_step = False __step_depth = 0 __step_depth_max = 0 _options = { 'recursive_depth' : 2, } def socket_add( id, method, condition = IO_READ ): """The first argument specifies a socket, the second argument has to be a function that is called whenever there is data ready in the socket. The callback function gets the socket back as only argument.""" global __sockets __sockets[ condition ][ id ] = method def socket_remove( id, condition = IO_READ ): """Removes the given socket from scheduler. If no condition is specified the default is IO_READ.""" global __sockets if __sockets[ condition ].has_key( id ): del __sockets[ condition ][ id ] def timer_add( interval, method ): """The first argument specifies an interval in milliseconds, the second argument a function. This is function is called after interval seconds. If it returns true it's called again after interval seconds, otherwise it is removed from the scheduler. The third (optional) argument is a parameter given to the called function. This function returns an unique identifer which can be used to remove this timer""" global __timer_id try: __timer_id += 1 except OverflowError: __timer_id = 0 __timers[ __timer_id ] = \ [ interval, int( time() * 1000 ) + interval, method ] return __timer_id def timer_remove( id ): """Removes the timer identifed by the unique ID from the main loop.""" if __timers.has_key( id ): del __timers[ id ] def dispatcher_add( method ): global __min_timer __min_timer = dispatch.MIN_TIMER dispatch.dispatcher_add( method ) dispatcher_remove = dispatch.dispatcher_remove __current_sockets = {} __current_sockets[ IO_READ ] = [] __current_sockets[ IO_WRITE ] = [] __current_sockets[ IO_EXCEPT ] = [] ( INTERVAL, TIMESTAMP, CALLBACK ) = range( 3 ) def step( sleep = True, external = True ): """Do one step forward in the main loop. First all timers are checked for expiration and if necessary the accociated callback function is called. After that the timer list is searched for the next timer that will expire. This will define the maximum timeout for the following select statement evaluating the registered sockets. Returning from the select statement the callback functions from the sockets reported by the select system call are invoked. As a final task in a notifier step all registered external dispatcher functions are invoked.""" global __in_step, __step_depth, __step_depth_max __in_step = True __step_depth += 1 try: if __step_depth > __step_depth_max: log.exception( 'maximum recursion depth reached' ) __step_depth -= 1 __in_step = False return # handle timers _copy = __timers.copy() for i, timer in _copy.items(): timestamp = timer[ TIMESTAMP ] if not timestamp: # prevert recursion, ignore this timer continue now = int( time() * 1000 ) if timestamp <= now: # Update timestamp on timer before calling the callback to # prevent infinite recursion in case the callback calls # step(). timer[ TIMESTAMP ] = 0 try: if not timer[ CALLBACK ](): if __timers.has_key( i ): del __timers[ i ] else: # Find a moment in the future. If interval is 0, we # just reuse the old timestamp, doesn't matter. now = int( time() * 1000 ) if timer[ INTERVAL ]: timestamp += timer[ INTERVAL ] while timestamp <= now: timestamp += timer[ INTERVAL ] timer[ TIMESTAMP ] = timestamp except ( KeyboardInterrupt, SystemExit ), e: __step_depth -= 1 __in_step = False raise e except: log.exception( 'removed timer %d' % i ) if __timers.has_key( i ): del __timers[ i ] # if it causes problems to iterate over probably non-existing # timers, I think about adding the following code: # if not __in_step: # break # get minInterval for max timeout timeout = None if not sleep: timeout = 0 else: now = int( time() * 1000 ) for t in __timers: interval, timestamp, callback = __timers[ t ] if not timestamp: # timer is blocked (recursion), ignore it continue nextCall = timestamp - now if timeout == None or nextCall < timeout: if nextCall > 0: timeout = nextCall else: timeout = 0 break if timeout == None: timeout = dispatch.MIN_TIMER if __min_timer and __min_timer < timeout: timeout = __min_timer r = w = e = () try: r, w, e = select( __sockets[ IO_READ ].keys(), __sockets[ IO_WRITE ].keys(), __sockets[ IO_EXCEPT ].keys(), timeout / 1000.0 ) except ( ValueError, select_error ): log.exception( 'error in select' ) sys.exit( 1 ) for sl in ( ( r, IO_READ ), ( w, IO_WRITE ), ( e, IO_EXCEPT ) ): sockets, condition = sl # append all unknown sockets to check list for s in sockets: if not s in __current_sockets[ condition ]: __current_sockets[ condition ].append( s ) while len( __current_sockets[ condition ] ): sock = __current_sockets[ condition ].pop( 0 ) is_socket = isinstance( sock, ( socket.socket, file, socket._socketobject ) ) if ( is_socket and sock.fileno() != -1 ) or \ ( isinstance( sock, int ) and sock != -1 ): if __sockets[ condition ].has_key( sock ): try: if not __sockets[ condition ][ sock ]( sock ): socket_remove( sock, condition ) except ( KeyboardInterrupt, SystemExit ), e: raise e except: log.exception( 'error in socket callback' ) sys.exit( 1 ) # handle external dispatchers if external: try: dispatch.dispatcher_run() except ( KeyboardInterrupt, SystemExit ), e: raise e except Exception, e: __step_depth -= 1 __in_step = False log.exception( 'error in dispatcher function' ) raise e except ( KeyboardInterrupt, SystemExit ), e: __step_depth -= 1 __in_step = False raise e __step_depth -= 1 __in_step = False def loop(): """Executes the 'main loop' forever by calling step in an endless loop""" while 1: step() def _init(): global __step_depth_max __step_depth_max = _options[ 'recursive_depth' ]