Package tigase.server

Class AbstractMessageReceiver

java.lang.Object
tigase.server.BasicComponent
tigase.server.AbstractMessageReceiver
All Implemented Interfaces:
ClusteredComponentIfc, Configurable, XMPPService, ConfigurationChangedAware, Initializable, MessageReceiver, PacketWriterWithTimeout, ServerComponent, StatisticsContainer, StatisticsContainerIfc, VHostListener
Direct Known Subclasses:
AbstractComponent, AbstractKernelBasedComponent, AmpComponent, ConnectionManager, EchoComponent, MessageRouter, MonitorComponent, SessionManager

public abstract class AbstractMessageReceiver extends BasicComponent implements StatisticsContainer, MessageReceiver, PacketWriterWithTimeout
This is an archetype for all classes processing user-level packets. The implementation is designed for a heavy packets processing with internal queues and number of separate threads depending on number of CPUs. Extensions of the class can process normall user packets and administrator packets via ad-hoc commands. Good examples of such components are MUC, PubSub, SessionManager.
The class offers scripting API for administrator ad-hoc commands.
By default it internally uses priority queues which in some rare cases may lead to packets reordering. When this happens and it is unacceptable for the deployment non-priority queues can be used. The queues size is limited and depends on the available memory size.
Packets are processed by processPacket(Packet packet) method which is concurrently called from multiple threads.
Created: Tue Nov 22 07:07:11 2005
Author:
Artur Hefczyc
  • Field Details

    • INCOMING_FILTERS_PROP_KEY

      public static final String INCOMING_FILTERS_PROP_KEY
      Configuration property key for setting incoming packets filters on the component level.
      See Also:
    • INCOMING_FILTERS_PROP_VAL

      public static final String INCOMING_FILTERS_PROP_VAL
      Configuration property default vakue with a default incoming packet filter loaded by Tigase server.
      This is a comma-separated list of classes which should be loaded as packet filters. The classes must implement PacketFilterIfc interface.
      See Also:
    • MAX_QUEUE_SIZE_PROP_KEY

      public static final String MAX_QUEUE_SIZE_PROP_KEY
      Configuration property key allowing to overwrite a default (memory size dependent) size for the component internal queues. By default the queue size is adjusted to the available memory size to avoid out of memory errors.
      See Also:
    • MAX_QUEUE_SIZE_PROP_VAL

      public static final Integer MAX_QUEUE_SIZE_PROP_VAL
      A default value for max queue size property. The value is calculated at the server startup time using following formula:
      Runtime.getRuntime().maxMemory() / 400000L You can change the default queue size by setting a different value for the MAX_QUEUE_SIZE_PROP_KEY property in the server configuration.
    • OUTGOING_FILTERS_PROP_KEY

      public static final String OUTGOING_FILTERS_PROP_KEY
      Configuration property key for setting outgoing packets filters on the component level. This is a comma-separated list of classes which should be loaded as packet filters. The classes must implement PacketFilterIfc interface.
      See Also:
    • OUTGOING_FILTERS_PROP_VAL

      public static final String OUTGOING_FILTERS_PROP_VAL
      Configuration property default vakue with a default outgoing packet filter loaded by Tigase server.
      This is a comma-separated list of classes which should be loaded as packet filters. The classes must implement PacketFilterIfc interface.
      See Also:
    • PACKET_DELIVERY_RETRY_COUNT_PROP_KEY

      public static final String PACKET_DELIVERY_RETRY_COUNT_PROP_KEY
      See Also:
    • SCHEDULER_THREADS_PROP_KEY

      public static final String SCHEDULER_THREADS_PROP_KEY
      Configuration property key for setting number of threads used by component ScheduledExecutorService.
      See Also:
    • SECOND

      protected static final long SECOND
      Constant used in time calculation procedures. Indicates a second that is 1000 milliseconds.
      See Also:
    • MINUTE

      protected static final long MINUTE
      Constant used in time calculation procedures. Indicates a minute that is 60 SECONDs.
      See Also:
    • HOUR

      protected static final long HOUR
      Constant used in time calculation procedures. Indicates a hour that is 60 MINUTEs.
      See Also:
    • maxInQueueSize

      protected int maxInQueueSize
    • maxOutQueueSize

      protected int maxOutQueueSize
    • maxQueueSize

      @ConfigField(desc="Maximum size of internal queues", alias="max-queue-size") protected int maxQueueSize
  • Constructor Details

    • AbstractMessageReceiver

      public AbstractMessageReceiver()
  • Method Details

    • addPacket

      public boolean addPacket(Packet packet)
      Method adds a Packet object to the internal input queue. Packets from the input queue are later passed to the processPacket(Packet) method. This is a blocking method waiting if necessary for the room if the queue is full.
      The method returns a boolean value of true if the packet has been successfully added to the queue and false otherwise.
      There can be many queues and many threads processing packets for the component, however the method makes the best effort to guarantee that packets are later processed in the correct order. For example that packets for a single user always end up in the same exact queue. You can tweak the packets distribution among threads by overwriting hashCodeForPacket(Packet) method.
      If there is N threads the packets are distributed among thread using following logic:
       int threadNo = Math.abs(hashCodeForPacket(packet) % N);
       

      This is a preferred method to be used by most Tigase components. If the queues are full the component should stop and wait for more room. The blocking methods aim to prevent from the system overloading or wasting resources for generating packets which can't be processed anyway.
      Specified by:
      addPacket in interface MessageReceiver
      Parameters:
      packet - is a Packet instance being put to the component internal input queue.
      Returns:
      a boolean value of true if the packet has been successfully added to the queue and false otherwise.
    • addPacketNB

      public boolean addPacketNB(Packet packet)
      This is a variant of addPacket(Packet) method which adds Packet to in the internal input queue without blocking.
      The method returns a boolean value of true if the packet has been successful added to the queue and false otherwise.
      Use of the non-blocking methods is not recommended for most of the components implementations. The only component which is allowed to use them is the server MessageRouter implementation which can not hang on any method. This would cause a dead-lock in the application. All other components must use blocking methods and wait if the system is under so high load that it's queues are full.
      See addPacket(Packet) method's documentation for some more details.
      Specified by:
      addPacketNB in interface MessageReceiver
      Parameters:
      packet - is a Packet instance being put to the component internal input queue.
      Returns:
      a boolean value of true if the packet has been successfully added to the queue and false otherwise.
      See Also:
    • addPackets

      public boolean addPackets(Queue<Packet> packets)
      This is a convenience method for adding all packets stored in given queue to the component's internal input queue.
      The method calls addPacket(Packet) in a loop for each packet in the queue. If the call returns true then the packet is removed from the given queue, otherwise the methods ends the loop and returns false.
      Please note, if the method returns true it means that all the packets from the queue passed as a parameter have been successfuly run through the addPacket(Packet) method and the queue passed as a parameter should be empty. If the method returns false then at least one packet from the parameter queue wasn't successfully run through the addPacket(Packet) method. If the method returns false then the queue passed as a parameter is not empty and it contains packet which was unseccessfully run through the addPacket(Packet) method and all the packets which were not run at all.
      Specified by:
      addPackets in interface MessageReceiver
      Parameters:
      packets - is a Queue of packets for adding to the component internal input queue. All the packets are later processed by processPacket(Packet) method in the same exact order if they are processed by the same thread. See documentation hashCodeForPacket(Packet) method how to control assiging packets to particular threads.
      Returns:
      a boolean value of true if all packets has been successfully added to the component's internal input queue and false otherwise.
      See Also:
    • addRegexRouting

      public void addRegexRouting(String address)
      Method adds a new routing address for the component. Routing addresses are used by the MessageRouter to calculate packet's destination. If the packet's destination address matches one of the component's routing addresses the packet is added to the component's internal input queue.
      By default all components accept packets addressed to the componentId and to:
       component.getName() + '@' + any virtual domain
       

      TODO: The future implementation most likely accept packets addressed to:
       any virtual domain + '/' + component.getName()
       

      instead.
      The routings are passed as Java regular expression strings are the extra addresses accepted by the component. In most cases this is used by the external component protocol implementations which can dynamically change accepted addresses depending on the connected external components.
      Parameters:
      address - is a Java regular expression string for the packet's destination address accepted by this component.
    • addTimerTask

      public void addTimerTask(TimerTask task, long delay)
      Method queues and executes timer tasks using ScheduledExecutorService which allows using more than one thread for executing tasks.
    • addTimerTask

      public void addTimerTask(TimerTask task, long initialDelay, long period)
    • addTimerTaskWithTimeout

      public void addTimerTaskWithTimeout(TimerTask task, long delay, long timeout)
      Method queues and executes timer tasks using ScheduledExecutorService which allows using more than one thread for executing tasks. It allows to set a timeout to cancel long running tasks
      Parameters:
      task - a task implementing TimerTask
      delay - in milliseconds delay after which task will be started
      timeout - in milliseconds after which task will be cancelled disregarding whether it has finished or not
    • addTimerTaskWithTimeout

      public void addTimerTaskWithTimeout(TimerTask task, long delay, long period, long timeout)
      Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; please refer to ScheduledExecutorService.scheduleAtFixedRate(java.lang.Runnable, long, long, java.util.concurrent.TimeUnit) javadoc for details. It utilizes Tigase TimerTask and allows setting a timeout to cancel long running tasks
      Parameters:
      task - a task implementing TimerTask
      delay - in milliseconds, the time to delay first execution
      period - in milliseconds, the period between successive executions
      timeout - in milliseconds after which task will be cancelled disregarding whether it has finished or not
    • clearRegexRoutings

      public void clearRegexRoutings()
      Method clears, removes all the component routing addresses. After this method call the component accepts only packets addressed to default routings that is component ID or the component name + '@' + virtual domains
    • everyHour

      public void everyHour()
      Utility method executed precisely every hour. A component can overwrite the method to put own code to be executed at the regular intervals of time.
      Note, no extensive calculations should happen in this method nor long lasting operations. It is essential that the method processing does not exceed 1 hour. The overriding method must call the the super method first and only then run own code.
      Overrides:
      everyHour in class BasicComponent
    • everyMinute

      public void everyMinute()
      Utility method executed precisely every minute. A component can overwrite the method to put own code to be executed at the regular intervals of time.
      Note, no extensive calculations should happen in this method nor long lasting operations. It is essential that the method processing does not exceed 1 minute. The overriding method must call the the super method first and only then run own code.
      Overrides:
      everyMinute in class BasicComponent
    • everySecond

      public void everySecond()
      Utility method executed precisely every second. A component can overwrite the method to put own code to be executed at the regular intervals of time.
      Note, no extensive calculations should happen in this method nor long lasting operations. It is essential that the method processing does not exceed 1 second. The overriding method must call the the super method first and only then run own code.
      Overrides:
      everySecond in class BasicComponent
    • hashCodeForPacket

      public int hashCodeForPacket(Packet packet)
      This method decides how incoming packets are distributed among processing threads. Different components needs different distribution to efficient use all threads and avoid packets re-ordering.
      If there are N processing threads, packets are distributed among threads using following code:
       int threadNo = Math.abs(hashCodeForPacket(packet) % N);
       

      For a PubSub component, for example, a better packets distribution would be based on the PubSub channel name, for SM a better distribution is based on the destination address, etc....
      Parameters:
      packet - is a Packet which needs to be processed by some thread.
      Returns:
      a hash code generated for the input thread.
    • newPacketId

      public String newPacketId(String prefix)
    • processingInThreads

      public int processingInThreads()
      Concurrency control method. Returns preferable number of threads set for this component.
      Returns:
      preferable number of threads set for this component.
    • processingOutThreads

      public int processingOutThreads()
      Concurrency control method. Returns preferable number of threads set for this component.
      Returns:
      preferable number of threads set for this component.
    • processOutPacket

      public void processOutPacket(Packet packet)
      By default this method just copies the given packet between queue. This method operates on packets which have been already processed somehow by the component so usually the default action is the best one, however some components in rare cases may choose to process packets differently. In most cases this method should not be overridden.
      Parameters:
      packet - is an output packet which normally has to go to other component for further processing.
    • processPacket

      public abstract void processPacket(Packet packet)
      This is the main Packet processing method. It is called concurrently from many threads so implementing it in thread save manner is essential. The method is called for each packet addressed to the component.
      Please note, the Packet instance may be processed by different parts of the server, different components or plugins at the same time. Therefore this is very important to tread the Packet instance as unmodifiable object.
      Processing in this method is asynchronous, therefore there is no result value. If there are some 'result' packets generated during processing, they should be passed back using addOutPacket(Packet) method.
      Parameters:
      packet - is an instance of the Packet class passed for processing.
    • processPacket

      public final void processPacket(Packet packet, Queue<Packet> results)
      Description copied from interface: ServerComponent
      processPacket is a blocking processing method implemented by all components. This method processes packet and returns results instantly without waiting for any resources.
      Specified by:
      processPacket in interface ServerComponent
      Overrides:
      processPacket in class BasicComponent
      Parameters:
      packet - a Packet value
    • release

      public void release()
      Description copied from interface: ServerComponent
      Method called when component is being stopped and unloaded.
      Specified by:
      release in interface ServerComponent
      Overrides:
      release in class BasicComponent
    • removeRegexRouting

      public boolean removeRegexRouting(String address)
    • schedulerThreads

      public int schedulerThreads()
      Method returns default number of threads used by SchedulerExecutorService
      Returns:
      a value of int
    • start

      public void start()
      Specified by:
      start in interface MessageReceiver
    • stop

      public void stop()
    • getRegexRoutings

      public Set<Pattern> getRegexRoutings()
      Method returns a Set with all component's routings as a compiled regular expression patterns. The Set can be empty but it can not be null.
      Returns:
      a Set with all component's routings as a compiled regular expression patterns.
    • getStatistics

      public void getStatistics(StatisticsList list)
      Method returns component statistics. Please note, the method can be called every second by the server monitoring system therefore no extensive or lengthy calculations are allowed. If there are some statistics requiring lengthy operations like database access they must have Level.FINEST assigned and must be put inside the level guard to prevent generating them by the system monitor. The system monitor does not collect FINEST statistics.
      Level guard code looks like the example below:
       if (list.checkLevel(Level.FINEST)) {
         // Some CPU intensive calculations or lengthy operations
         list.add(getName(), "Statistic description", stat_value, Level.FINEST);
       }
      
       
      This way you make sure your extensive operation is not executed every second by the monitoring system and does not affect the server performance.
      Specified by:
      getStatistics in interface StatisticsContainerIfc
      Overrides:
      getStatistics in class BasicComponent
      Parameters:
      list - is a StatistcsList where all statistics are stored.
    • isInRegexRoutings

      public boolean isInRegexRoutings(String address)
      Specified by:
      isInRegexRoutings in interface MessageReceiver
    • setIncoming_filters

      public void setIncoming_filters(PacketFiltersBean.IncomingPacketFiltersBean incoming_filters)
    • setOutgoing_filters

      public void setOutgoing_filters(PacketFiltersBean.OutgoingPacketFiltersBean outgoing_filters)
    • updateFiltersName

      protected void updateFiltersName()
    • beanConfigurationChanged

      public void beanConfigurationChanged(Collection<String> changedFields)
      Description copied from interface: ConfigurationChangedAware
      Method called when configuration was applied to bean.
      Specified by:
      beanConfigurationChanged in interface ConfigurationChangedAware
      Overrides:
      beanConfigurationChanged in class BasicComponent
      Parameters:
      changedFields - collection of field names which were changed
    • setCompId

      public void setCompId(JID jid)
      Overrides:
      setCompId in class BasicComponent
    • setName

      public void setName(String name)
      Description copied from interface: ServerComponent
      Method used to assign component name (localpart of the component)
      Specified by:
      setName in interface ServerComponent
      Overrides:
      setName in class BasicComponent
      Parameters:
      name - to be assigned
    • setParent

      public void setParent(MessageReceiver parent)
      Specified by:
      setParent in interface MessageReceiver
    • setSchedulerThreads_size

      public void setSchedulerThreads_size(int size)
    • addOutPacketWithTimeout

      public boolean addOutPacketWithTimeout(Packet packet, ReceiverTimeoutHandler handler, long delay, TimeUnit unit)
    • addOutPacketWithTimeout

      public boolean addOutPacketWithTimeout(Packet packet, Duration timeout, PacketWriterWithTimeout.Handler handler)
      Specified by:
      addOutPacketWithTimeout in interface PacketWriterWithTimeout
    • addOutPacket

      protected boolean addOutPacket(Packet packet)
    • addOutPacketNB

      protected boolean addOutPacketNB(Packet packet)
      Non blocking version of addOutPacket.
      Parameters:
      packet - a Packet value
      Returns:
      a boolean value
    • addOutPackets

      protected boolean addOutPackets(Queue<Packet> packets)
    • addTimerTask

      protected void addTimerTask(TimerTask task, long delay, TimeUnit unit)
      Method queues and executes timer tasks using ScheduledExecutorService which allows using more than one thread for executing tasks.
    • getMaxQueueSize

      protected Integer getMaxQueueSize(int def)
    • getResourceForPacketWithTimeout

      protected String getResourceForPacketWithTimeout()