Source code for ginsfsm.gobj

# -*- encoding: utf-8 -*-
"""
Introduction
============

An :term:`GObj` is an object that has an inside :term:`simple-machine`
that defines its behavior.

The communication between :term:`gobj`'s happens via :term:`event`'s:

:class:`GObj` has the next methods:

* :meth:`GObj.start_up`
* :meth:`GObj.create_gobj`
* :meth:`GObj.destroy_gobj`
* :meth:`GObj.send_event`
* :meth:`GObj.post_event`
* :meth:`GObj.post_event_from_outside`
* :meth:`GObj.broadcast_event`
* :meth:`GObj.subscribe_event`
* :meth:`GObj.delete_subscription`
* :meth:`GObj.set_owned_event_filter`
* :meth:`GObj.overwrite_parameters`
* :meth:`GObj.overwrite_few_parameters`

And this inherited from :class:`ginsfsm.smachine.SMachine`:

* :meth:`ginsfsm.smachine.SMachine.set_new_state`
* :meth:`ginsfsm.smachine.SMachine.get_current_state`
* :meth:`ginsfsm.smachine.SMachine.get_event_list`
* :meth:`ginsfsm.smachine.SMachine.get_output_event_list`


:class:`GObj` has the next public attributes:

* :attr:`GObj.name`
* :attr:`GObj.parent`
* :attr:`GObj.dl_childs`

Events to manage events:

* sending events:

    * sending events by direct delivery: :meth:`GObj.send_event`.
    * sending events by queues: :meth:`GObj.post_event`.
    * sending events from outside world: :meth:`GObj.post_event_from_outside`.
    * sending events to subscribers: :meth:`GObj.broadcast_event`.


* receiving events:

    * directly from another :term:`gobj`'s who knows you.
    * subscribing to events by the :meth:`GObj.subscribe_event` method.
    * you can filtering events being broadcasting with
      :meth:`GObj.set_owned_event_filter` method.


Event
=====

.. autoclass:: Event
    :members:


GObj
====

.. autoclass:: GObj
    :members: start_up, create_gobj, destroy_gobj,
        send_event, post_event, broadcast_event, post_event_from_outside,
        subscribe_event, delete_subscription, set_owned_event_filter,
        overwrite_parameters, overwrite_few_parameters

    .. attribute:: name

        My name.

        Set by :meth:`create_gobj`.

    .. attribute:: parent

        My parent, destination of my events... or not.

        Set by :meth:`create_gobj`.

    .. attribute:: dl_childs

        Set of my gobj childs. Me too like be parent!.


    .. method:: set_new_state

        Set a new state.
        Method to used inside actions, to force the change of state.

        :param new_state: new state to set.

        ``new_state`` must match some of the state names of the
        machine's :term:`state-list` or a :exc:`StateError` exception
        will be raised.

    .. method:: get_current_state

        Return the name of the current state.

        If there is no state it returns ``None``.

    .. method:: get_event_list

       Return the list with the :term:`input-event`'s names.

    .. method:: get_output_event_list

       Return the list with the :term:`output-event`'s names.


Exceptions
==========

.. autoexception:: GObjError

.. autoexception:: EventError

.. autoexception:: StateError

.. autoexception:: MachineError

.. autoexception:: EventNotAcceptedError

"""
import re
import ginsfsm.globals  # made it import available
from ginsfsm.compat import string_types
from ginsfsm.smachine import (
    SMachine,
    EventError,
    EventNotAcceptedError,  # made it import available
    StateError,  # made it import available
    MachineError,  # made it import available
)
from ginsfsm.gconfig import (
    GConfig,
    add_gconfig,
)
from ginsfsm.deferred import Deferred
from ginsfsm.globals import global_get_gobj


[docs]class GObjError(Exception): """ General GObj exception"""
[docs]class Event(object): """ Collect event properties. This is the argument received by actions. :param destination: destination gobj whom to send the event. :param event_name: event name. :param source: list with path of gobj sources. Firt item ``source[0]`` is the original sender gobj. Last item ``source[-1]`` is the nearest sender gobj. :param kw: keyword arguments with associated data to event. """ # For now, event_factory is private. Automatically using by send_event... #Use the :meth:`GObj.event_factory` factory function to create Event #instances. def __init__(self, destination, event_name, source, **kw): self.destination = destination self.event_name = event_name if not isinstance(source, (list, tuple)): source = [source] self.source = source self.kw = kw.copy() self.__dict__.update(**kw) def copy(self): pass def __repr__(self): return 'Event object: name %r, destination: %r, kw: %r' % ( self.event_name, self.destination, self.kw, ) def __str__(self): return 'Event object: name %r, destination: %r, kw: %r' % ( self.event_name, self.destination, self.kw, )
class _Subscription(object): """ Collect subscriber properties `event_name`: event name. `subscriber_gobj`: subcriber gobj to sending event. `kw`: event parameters """ def __init__(self, me, event_name, subscriber_gobj, **kw): self.me = me self.event_name = event_name self.subscriber_gobj = subscriber_gobj self.kw = kw self.__dict__.update(**kw) def __repr__(self): resp = "Subscription:\n me : %r\n ev : %r\n sub: %r\n" % ( self.me, self.event_name, self.subscriber_gobj) return resp GOBJ_GCONFIG = { 'trace_traverse': [bool, False, 0, None, 'Trace traverse search'], 'trace_creation': [ bool, False, GConfig.FLAG_DIRECT_ATTR, None, 'Trace creation/destroy of gobj'], 'trace_subscription': [bool, False, 0, None, 'Trace event subscribing'], '__new_subscriptor_event__': [ str, None, 0, None, 'If exists, the subscribe_event method will send that event to the obj' ' to inform of a new subscriber.'], 're_name': [ str, None, 0, None, 'Regular expression name to search the gobj in the resource tree.' 'Used in Pyramid traversal.'], # trace_mach is inherited from SMachine. 'trace_mach': [ bool, False, GConfig.FLAG_DIRECT_ATTR, None, 'Trace machine activity' ], # logger is inherited from SMachine. 'logger': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], 'gaplic': [None, None, GConfig.FLAG_DIRECT_ATTR, None, 'My grand-father'], 'create_gobj': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], 'destroy_gobj': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '_increase_inside': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '_decrease_inside': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '_tab': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '__unique_name__': [ bool, False, GConfig.FLAG_DIRECT_ATTR, None, 'If using gaplic, unique name' ], } _urandom_name = 0
[docs]class GObj(SMachine, GConfig): """ Well, yes, I'm a very simple brain. Only a machine. But write a good FSM, and I never fail you. Derive me, and write my FSM. Sample GObj:: class MyGObj(GObj): def __init__(self): GObj.__init__(self, FSM, GCONFIG) def start_up(self): ''' Initialization zone.''' :param fsm: FSM :term:`simple-machine`. :param gconfig: GCONFIG :term:`gconfig-template`. """ # name and parent attributes must be Location-Aware Resources # for Pyramid framework compatibility: # changed to point to __name__ and __parent__. @property def name(self): return self.__name__ @name.setter def name(self, name): self.__name__ = name @property def parent(self): return self.__parent__ @parent.setter def parent(self, parent): self.__parent__ = parent def __init__(self, fsm, gconfig=None): SMachine.__init__(self, fsm) self.name = '' """ My name. Set by :meth:`create_gobj` """ self.parent = None """My parent, destination of my events... or not. Set by :meth:`create_gobj` """ self.dl_childs = set() # my childs... me too like be parent. """List of gobj childs. """ self.owned_event_filter = None # TODO debe ser una lista de filtros """Filter to broadcast_event function to check the owner of events. """ self._dl_subscriptions = set() # uauuu, how many fans!! self._some_subscriptions = False self._destroyed = False # mark as destroyed when destroy_gobj() self._re_compiled_name = '' # re compiled name when using re_name self.re_matched_name = '' # matched name when using re_name self.trace_creation = False gconfig = add_gconfig(gconfig, GOBJ_GCONFIG) GConfig.__init__(self, gconfig, self.logger) def __del__(self): if self.trace_creation: self.logger and self.logger.info("Destroyed! <-- %r" % (self)) def __str__(self): name = "%s" % (self.name) parent = self.parent while(parent): parent_name = "%s" % (parent.name) name = parent_name + '.' + name parent = parent.parent return "%s" % (name) def __repr__(self): name = "%s:%s" % (self.__class__.__name__, self.name) parent = self.parent while(parent): parent_name = "%s:%s" % (parent.__class__.__name__, parent.name) name = parent_name + '.' + name parent = parent.parent return "'%s: %s'" % (name, 'Destroyed' if self._destroyed else "Lived")
[docs] def create_gobj(self, name, gclass, parent, **kw): """ Factory function to create gobj's instances. :param name: Name of the gobj. :param gclass: `gclass` is the :class:`GObj` type used to create the new gobj. It's must be a derived class of :class:`GObj` otherwise a :exc:`GObjError` exception will be raised. :param parent: parent of the new :term:`gobj`. ``None`` if it has no parent. If it's not ``None``, then must be a derived class of :class:`GObj` otherwise a :exc:`GObjError` exception will be raised. :param kw: Attributes that are added to the new :term:`gobj`. All the keyword arguments used in the creation function **are added as attributes** to ``config`` object. You must consult the attributes supported by each `gclass` type. The attributes must be defined in the gclass GCONFIG, otherwise they are ignored. Special kw: ``'__random_name__'``: Generate a random gobj name TODO: doc re_name :rtype: new gobj instance. The factory funcion does: * Add the :term:`gobj` to their parent child list :attr:`GObj.dl_childs`, * If it's a :term:`named-gobj` call the :meth:`GObj.register_unique_gobj`. * Call :meth:`GObj.start_up`. * Add to the :term:`gobj` several attributes: * **name**: name of the created :term:`gobj`. * **parent**: the parent :term:`gobj` of the created :term:`gobj`. """ if gclass is None: raise GObjError( '''ERROR create_gobj(): No GObj class supplied.''') if not issubclass(gclass, GObj): raise GObjError( '''ERROR create_gobj(): class '%r' is NOT a GObj subclass''' % (gclass)) if parent is not None: if not isinstance(parent, GObj): raise GObjError( '''ERROR create_gobj(): parent '%r' ''' '''is NOT a GObj subclass''' % (gclass)) self.__random_name__ = kw.pop('__random_name__', False) if self.__random_name__: name = self.get_random_name(name) gobj = gclass() if name: gobj.name = name # Who wins? arguments or file ini_settings? # ini global (in gaplic) wins. gobj.write_parameters(**kw) if self.gaplic and self.gaplic.ini_settings is not None: # ini global win. gobj.overwrite_parameters(0, **self.gaplic.ini_settings) if parent is not None: parent._add_child(gobj) if gobj.config.re_name: gobj._re_compiled_name = re.compile(gobj.config.re_name) if gobj.trace_creation: self.logger and self.logger.info("Creating --> '%s:%s'" % ( gclass.__name__, name)) if gobj.__unique_name__: registered = self.gaplic._register_unique_gobj(gobj) if not registered: raise GObjError( "ERROR create_gobj():" " cannot _register_unique_gobj '%s' " % (gobj.name)) gobj.start_up() if gobj.trace_creation and self.logger: if gobj.__unique_name__: unique = '!' else: unique = ' ' self.logger.info("Created <--%s '%r'" % (unique, gobj)) return gobj
def get_random_name(self, prefix): global _urandom_name _urandom_name += 1 if prefix: return '%s_%d' % (prefix, _urandom_name) else: return '%d' % (_urandom_name)
[docs] @staticmethod def destroy_gobj(gobj): """ Destroy a gobj """ if gobj.trace_creation and gobj.logger: gobj.logger.info("Destroying --> %r" % (gobj)) if gobj._destroyed: if gobj.logger: gobj.logger.error( "ERROR reentering in destroy_gobj: %r" % gobj) return gobj._destroyed = True if gobj.parent is not None: gobj.parent._remove_child(gobj) while len(gobj.dl_childs): try: for child in gobj.dl_childs: if not child._destroyed: GObj.destroy_gobj(child) except RuntimeError: pass # "Set changed size during iteration" is OK gobj.delete_all_subscriptions(force=True) gobj.go_out() del gobj
[docs] def start_up(self): """ Initialization zone. Well, the __init__ method is used to build the FSM so I need another function to initialize the new gobj. Please, **override me**, and write here all the code you need to start up the machine: create your owns childs, etc. This function is called by :meth:`create_gobj` after creating the gobj instance. """
def go_out(self): """ Finish zone. Please, **override me** to do extra work when the gobj is being destroyed. In this point, all childs and subscriptions are already deleted. """ def _resolv_destination(self, destination): """ Resolve the destination gobj. If destination it's a string: * check is gaplic exists. * try to resolv the destination by gaplic. If the string destination cannot be resolved then return the string. It must be resolved by gaplic router. """ if not (isinstance(destination, (string_types, GObj))): raise GObjError( 'Destination gobj ("%s") must be a string or a GObj instance' % (destination) ) if isinstance(destination, string_types): if not self.gaplic: raise GObjError( 'When destination gobj ("%s") is a string, ' 'you need a GAplic!' % (destination) ) named_gobj = self.gaplic.find_unique_gobj(destination) if not named_gobj: raise GObjError('GObj %r UNKNOWN in %r' % ( destination, self.gaplic)) destination = named_gobj return destination def _event_factory(self, destination, event, **kw): """ Factory to create Event instances. :param destination: destination gobj whom send the event. :param event: an :term:`event`. :param kw: keyword arguments with associated data to event. :rtype: Return Event instance. ``event`` must be a `string` or :class:`Event` types, otherwise a :exc:`EventError` will be raised. If ``event`` is an :class:`Event` instance, a new :class:`Event` duplicated instance is returned, but it will be updated with the new ``destination`` and ``kw`` keyword arguments. .. note:: All the keyword arguments used in the factory function **are added as attributes** to the created :term:`event` instance. You must consult the attributes supported by each machine's event. """ if not (isinstance(event, string_types) or isinstance(event, Event)): raise EventError( 'Event "%r" must be a string or Event instance' % (event,)) # if destination is not None: if not (isinstance(destination, string_types) or isinstance(destination, GObj) or isinstance(destination, Deferred) ): raise GObjError( 'Destination "%r" must be a string or GObj/Deferred instance' % (destination,)) if isinstance(event, Event): # duplicate the event if event.source[-1] != self: event.source.append(self) if len(kw): event.kw.update(**kw) event = Event( event.destination, event.event_name, event.source, **event.kw ) if destination is not None: event.destination = destination else: event = Event(destination, event, self, **kw) return event
[docs] def send_event(self, destination, event, **kw): """ Send ``event`` to ``destination``, with associated event data ``kw``. :param destination: Must be a string gobj name or a :term:`gobj` instance, otherwise a :exc:`GObjError` will be raised. If it's a string and the gobj name cannot be resolved inside the current :term:`gaplic`, then the event is :func:`post_event` in order to be resolved by the gaplic router. :param event: Must be a string event name or a :class:`Event` instance, otherwise an :exc:`EventError` will be raised. :param kw: All the keyword arguments **are added as attributes** to the sent :class:`Event` instance. You must consult the attributes supported by each machine's event. :rtype: return the returned value from the executed action. Return ``None`` if the event has been :func:`post_event`. If the :term:`event-name` exists in the machine, but it's not accepted by the current state, then no exception is raised but the function **returns** :exc:`EventNotAcceptedError`. .. note:: The :meth:`inject_event` method doesn't **raise** :exc:`EventNotAcceptedError` because a :term:`machine` should run under any circumstances. In any way an action can raise exceptions. """ destination = self._resolv_destination(destination) event = self._event_factory(destination, event, **kw) if isinstance(destination, string_types): # string gobjs must be resolved by gaplic return self.post_event(destination, event) if destination._destroyed: self.logger and self.logger.error( "ERROR internal: sending event %r to a destroyed gobj %r" % ( event.event_name, destination) ) return -1 ret = destination.inject_event(event) return ret
[docs] @staticmethod def post_event_from_outside(gaplic_name, gobj_name, event, **kw): """ Same funcionality as :func:`post_event`, but the event is sent from non-gobj world. """ gaplic = ginsfsm.globals.global_get_gaplic(gaplic_name) if not gaplic: raise Exception("CHECK NAMES!! gaplic %s doesn't exist" % ( gaplic_name,)) gobj = global_get_gobj(gobj_name, gaplic_name) if not gobj: raise Exception("CHECK NAMES!! gobj %s doesn't exist" % ( gobj_name,)) gaplic.post_event(gobj, event, **kw)
[docs] def post_event(self, destination, event, **kw): """ Same funcionality as :func:`send_event`, but the event is processed by gaplic. If the destination is inside of the current gaplic the event will be sent in the next loop cycle. """ event = self._event_factory(destination, event, **kw) if not self.gaplic: raise GObjError( 'To use post_event, you need a GAplic!' ) self.gaplic.enqueue_event(event)
[docs] def broadcast_event(self, event, **kw): """ Broadcast the ``event`` to all subscribers. :param event: :term:`event` to send. :param kw: keyword argument with data associated to event. .. note:: All the keyword arguments **are added as attributes** to the sent :term:`event`. Use this function when you don't know who are your event's clients, when you don't know the :term:`gobj` destination of your :term:`output-event`'s If there is no subscriptors, the event is not sent. When an event has several subscriptors, there is a mechanism called :term:`event-filter` that allows to a subcriptor to own the event and no further spread by more subscribers. The filter function set by :meth:`set_owned_event_filter` method, is call with the returned value of an :term:`action` as argument: If the filter function return ``True``, the event is owned, and the :func:`ginsfsm.gobj.GObj.broadcast_event` function doesn't continue sending the event to other subscribers. .. note:: If :func:`ginsfsm.gobj.GObj.broadcast_event` function uses :func:`ginsfsm.gobj.GObj.post_event`, the :term:`event-filter` cannot be applied. """ if self.config.trace_subscription and self.logger: self.logger.info('BROADCAST event: %r, gobj: %r' % (event, self)) if not self._some_subscriptions: return subscriptions = self._dl_subscriptions.copy() sended_gobj = set() # don't repeat events for sub in subscriptions: if sub.subscriber_gobj in sended_gobj: continue oevent = self._event_factory(sub.subscriber_gobj, event, **kw) if None in sub.event_name or \ oevent.event_name in sub.event_name: if hasattr(sub, '__rename_event_name__'): oevent.kw.update( { 'original_event_name': oevent.event_name, } ) oevent.event_name = sub.__rename_event_name__ if hasattr(sub, '__subscription_reference__'): oevent.kw.update( { '__subscription_reference__': sub.__subscription_reference__, } ) oevent.event_name = sub.__rename_event_name__ ret = False if isinstance(sub.subscriber_gobj, Deferred): # outside world if hasattr(sub, '__deferred_witout_oevent__'): sub.subscriber_gobj() else: sub.subscriber_gobj(event=oevent) else: # gobj-ecosistema if hasattr(sub, '__use_post_event__'): ret = self.post_event(sub.subscriber_gobj, oevent) else: ret = self.send_event(sub.subscriber_gobj, oevent) if self.owned_event_filter: ret = self.owned_event_filter(ret) if ret is True: return # propietary event sended_gobj.add(sub.subscriber_gobj)
[docs] def subscribe_event(self, event_name, subscriber_gobj, **kw): """ Subscribe to an event. :param event_name: string event name or tuple/list of string event names. If ``event_name`` is ``None`` then it subscribes to all events. If it's not ``None`` then it must be a valid event name from the :term:`output-event` list, otherwise a :exc:`EventError` will be raised. :param subscriber_gobj: subscriber obj that wants receive the event. ``subscriber_gobj`` must be: * `None`: the subscriber is the parent. * `string`: the subscriber is a :term:`unique-named-gobj`. * type :class:`GObj` instance. * Deferred callback. otherwise otherwise a :exc:`GObjError` will be raised. :param kw: keyword arguments. :return: the subscription object. Possible values for **kw** arguments: * `__use_post_event__`: ``bool`` You must set it to `True` in order to broadcast the events using `post-event` instead of `send-event`. * `__rename_event_name__`: `'new event name'` You can rename the output original event name. The :attr:`original_event_name` attribute is added to the sent event with the value of the original event name. * `__hard_subscription__`: ``bool`` True for permanent subscription. This subscription cannot be remove, neither with delete_all_subscriptions(). (Well, with force you can) * `__subscription_reference__`: ``str`` If exists, it will be added as kw in the event broadcast. Can be used by the subscriptor for general purposes. * `__deferred_witout_oevent__`: ``Bool`` If True, the callback is called without event keyword parameter. * `__first_shot__`: ``Bool`` If True then generate a broadcast event to the new subscriber. It's necesary to have __new_subscriptor_event__ flag, and it's responsability of the gobj to do the first shot. """ #if subscriber_gobj is not None: if not (isinstance(subscriber_gobj, string_types) or isinstance(subscriber_gobj, Deferred) or isinstance(subscriber_gobj, GObj) ): raise GObjError('Bad type of subscriber_gobj %r' % ( subscriber_gobj,)) output_events = self.get_output_event_list() if not isinstance(event_name, (list, tuple)): event_name = (event_name,) for name in event_name: if name is None: continue if not isinstance(name, string_types): raise EventError( 'subscribe_event(): event %r is not string in %r' % (name, self)) if name not in output_events: raise EventError( 'subscribe_event(): output-event %r not defined in' ' %r' % (event_name, self)) existing_subs = self._find_subscription(event_name, subscriber_gobj) if existing_subs: # avoid duplication subscriptions if self.logger: self.logger.warning( "WARNING duplicate subscription:\n" " me : %r\n" " ev : %r\n" " sub: %r\n" % (self, event_name, subscriber_gobj)) self.delete_subscription(event_name, subscriber_gobj) subscription = _Subscription(self, event_name, subscriber_gobj, **kw) if self.config.trace_subscription and self.logger: self.logger.info('NEW %r' % subscription) self._dl_subscriptions.add(subscription) self._some_subscriptions = True if self.config.__new_subscriptor_event__: self.send_event( self, self.config.new_susbcriptor_event, subscription=subscription) return subscription
def _find_subscription(self, event_name, subscriber_gobj): """ Find a subscription by event_name and subscriber gobj. Internal use to avoid duplicates subscriptions. """ if not isinstance(event_name, (list, tuple)): event_name = (event_name,) for sub in self._dl_subscriptions: if sorted(list(sub.event_name)) == sorted(list(event_name)): if sub.subscriber_gobj == subscriber_gobj: return sub return None def delete_subscription_by_object(self, subscription): if subscription in self._dl_subscriptions: if self.config.trace_subscription and self.logger: self.logger.info('DEL %r' % subscription) self._dl_subscriptions.remove(subscription) if len(self._dl_subscriptions) == 0: self._some_subscriptions = False return True return False
[docs] def delete_subscription(self, event_name, subscriber_gobj): """ Remove `subscription`. :param event_name: string event name or tuple/list of string event names. :param subscriber_gobj: subscriber gobj. """ sub = self._find_subscription(event_name, subscriber_gobj) if self.config.trace_subscription and self.logger: self.logger.info('DEL %r' % sub) if sub: if sub.kw.get('__hard_subscription__', None): if self.logger: self.logger.error( "WARNING cannot delete a hard subscription(): '%s'" % ( event_name)) return False else: self._dl_subscriptions.remove(sub) if len(self._dl_subscriptions) == 0: self._some_subscriptions = False return True if self.logger: self.logger.error( "ERROR delete_subscription(): '%s' NOT FOUND " % (event_name)) return False
def delete_all_subscriptions(self, force=False): """ Remove all subscriptions. """ subscriptions = self._dl_subscriptions.copy() for sub in subscriptions: if not force: if sub.kw.get('__hard_subscription__'): continue if self.config.trace_subscription and self.logger: self.logger.info('DEL %r' % sub) self._dl_subscriptions.remove(sub) if len(self._dl_subscriptions) == 0: self._some_subscriptions = False
[docs] def set_owned_event_filter(self, filter): """ Set a filter function to be used by :meth:`broadcast_event` function to check the owner of events. """ self.owned_event_filter = filter
def _add_child(self, gobj): """ Add a child ``gobj``. :param gobj: :term:`gobj` child to add. Raise :exc:`GObjError` is ``gobj`` already has a parent. This function is called by :meth:`create_gobj` after creating the gobj instance. """ if gobj.parent: raise GObjError('GObj "%r" already has parent' % (gobj)) self.dl_childs.add(gobj) gobj.parent = self def _remove_child(self, gobj): """ Remove the child ``gobj``. :param gobj: :term:`gobj` child to remove. This function is called by :meth:`destroy_gobj`. """ if gobj in self.dl_childs: self.dl_childs.remove(gobj) gobj.parent = None def __getitem__(self, name): """ Enable gobjs tree to work with Pyramid traversal URL dispatch. """ trace_traverse = self.config.trace_traverse if trace_traverse and self.logger: self.logger.debug("==> TRAVERSING('%s') -> %s" % ( name, self.resource_path())) if name is None: raise KeyError('Parameter name cannot be None') # Firstly search no re_name for gobj in self.dl_childs: if gobj.config.re_name: continue else: if gobj.name == name: if trace_traverse and self.logger: self.logger.debug(" TRAVERSING FOUND!") return gobj # Secondly search re_name for gobj in self.dl_childs: if gobj.config.re_name: if gobj._re_compiled_name.match(name) is not None: # this gobj has been got as re_matched_name. gobj.re_matched_name = name if trace_traverse and self.logger: self.logger.debug(" TRAVERSING %s FOUND RE!" % name) return gobj else: continue if trace_traverse and self.logger: self.logger.debug("<== TRAVERSING('%s') -> %s END!" % ( name, self.resource_path())) raise KeyError('No such child named %s' % name) def __iter__(self): """ Iterates over child elements. """ return self.dl_childs.__iter__() # helper for pyramid def resource_path(self, separator='/'): """ Same as Pyramid traversal.resource_path, but without escaping. """ path = [loc.__name__ or '' for loc in lineage(self)] path.reverse() return path and separator.join([x for x in path]) or separator
[docs] def overwrite_parameters(self, level, **settings): """ The parameters in settings must be defined in the gobj. :param level: level trace of childs. indicates the depth of the childs as far to change. * `0` only this gobj. * `-1` all tree of childs. :param settings: parameters and their values. The settings are filtered by the named-gobj or gclass name of this gobj. The parameter name in settings, must be a dot-named, with the first item being the named-gobj o gclass name. """ parameters = self.filter_parameters(**settings) self.write_parameters(**parameters) if level > 0: for child in self.dl_childs: child.overwrite_parameters(level - 1, **settings)
[docs] def overwrite_few_parameters(self, parameter_list, **settings): """ The parameters in settings must be defined in the gobj. :param parameter_list: write only the parameters in ``parameter_list``. :param settings: parameters and their values. The settings are filtered by the named-gobj or gclass name of this gobj. The parameter name in settings, must be a dot-named, with the first item being the named-gobj o gclass name. """ parameters = self.filter_parameters(**settings) self.write_few_parameters(parameter_list, **parameters)
def inside(resource1, resource2): """Is ``resource1`` 'inside' ``resource2``? Return ``True`` if so, else ``False``. ``resource1`` is 'inside' ``resource2`` if ``resource2`` is a :term:`lineage` ancestor of ``resource1``. It is a lineage ancestor if its parent (or one of its parent's parents, etc.) is an ancestor. """ while resource1 is not None: if resource1 is resource2: return True resource1 = resource1.__parent__ return False def lineage(resource): """ Return a generator representing the :term:`lineage` of the :term:`resource` object implied by the ``resource`` argument. The generator first returns ``resource`` unconditionally. Then, if ``resource`` supplies a ``__parent__`` attribute, return the resource represented by ``resource.__parent__``. If *that* resource has a ``__parent__`` attribute, return that resource's parent, and so on, until the resource being inspected either has no ``__parent__`` attribute or which has a ``__parent__`` attribute of ``None``. For example, if the resource tree is:: thing1 = Thing() thing2 = Thing() thing2.__parent__ = thing1 Calling ``lineage(thing2)`` will return a generator. When we turn it into a list, we will get:: list(lineage(thing2)) [ <Thing object at thing2>, <Thing object at thing1> ] """ while resource is not None: yield resource # The common case is that the AttributeError exception below # is exceptional as long as the developer is a "good citizen" # who has a root object with a __parent__ of None. Using an # exception here instead of a getattr with a default is an # important micro-optimization, because this function is # called in any non-trivial application over and over again to # generate URLs and paths. try: resource = resource.__parent__ except AttributeError: resource = None