Chapter 22. Component Implementation - Lesson 3 - Multi-Threading

Artur Hefczyc <artur.hefczyc@tigase.net> v2.0, June 2014: Reformatted for AsciiDoc. :toc: :numbered: :website: http://tigase.net/ :Date: 2010-01-06 20:22

Multi core and multi CPU machines are nowadays very common. Especially for the application like the XMPP server you most likely deploy your service on a server with a few cores or even a few CPUs. Your new component however processes all packets in a single thread.

This is especially important if the packet processing is CPU expensive like, for example, SPAM checking. In such a case you could experience single Core/CPU usage at 100% while other Cores/CPUs are idling. Ideally, you want your component to use all available CPUs.

The Tigase API offers a very simple way to execute component’s processPacket(Packet packet) method in multiple threads. Methods int processingOutThreads() and int processingInThreads() returns number of threads assigned to the component. By default it returns just 1 as not all component implementations are prepared to process packets concurrently. By overwriting the method you can return any value you think is appropriate for the implementation. Please note, there are two methods, one is for a number of threads for incoming packets to the component and another for outgoing packets from the component. It used to be a single method but different components have different needs and the best performance can be achieved when the outgoing queues have a separate threads pool from incoming queues. Also some components only receive packets while other only send, therefore assigning an equal number of threads for both could be a waste of resources.

If the packet processing is CPU bound only, you normally want to have as many threads as there are CPUs available:

@Override
public int processingInThreads() {
  return Runtime.getRuntime().availableProcessors();
}
@Override
public int processingOutThreads() {
  return Runtime.getRuntime().availableProcessors();
}

If the processing is I/O bound (network or database) you probably want to have much more threads to process requests. It is hard to guess ideal number of threads, instead you should run a few tests to see what exact number is best for the component implementation.

Now you have many threads for processing your packets. There is one slight problem with this, however. In many cases packets order is essential. If our processPacket(…​) method is executed concurrently by a few threads it is quite possible that a message sent to user can takeover the message sent earlier. Especially if the first message was large and the second was small. We can prevent this by adjusting method responsible for packets distribution among threads.

The algorithm for packets distribution among threads is very simple:

int thread_idx = hashCodeForPacket(packet) % threads_total;

So the key here is hashCodeForPacket(…​) method. By overwriting it we can make sure that all packets addressed to the same user will always be processed by the same thread:

@Override
public int hashCodeForPacket(Packet packet) {
  if (packet.getElemTo() != null) {
    return packet.getElemTo().hashCode();
  }
  // This should not happen, every packet must have a destination
  // address, but maybe our SPAM checker is used for checking
  // strange kind of packets too....
  if (packet.getElemFrom() != null) {
    return packet.getElemFrom().hashCode();
  }
  // If this really happens on your system you should look
  // carefully at packets arriving to your component and
  // find a better way to calculate hashCode
  return 1;
}

Above two methods give a control over the number of threads assigned to the packets processing in your component and to the packets distribution among threads. This is not all the Tigase API has to offer in terms of multi-threading.

Sometimes you want to perform some periodic actions. You can of course create Timer instance and load it with TimerTasks but as there might be a need for this on every level of the Class hierarchy you could end-up with multiple Timer (threads in fact) objects doing similar job and using resources. There are a few methods which allow you to reuse common Timer object to perform all sorts of actions.

First, you have three methods allowing your to perform some periodic actions:

public synchronized void everySecond();
public synchronized void everyMinute();
public synchronized void everyHour();

An example implementation for periodic notifications sent to some address could look like this one:

@Override
public synchronized void everyMinute() {
  super.everyMinute();
  if ((++delayCounter) >= notificationFrequency) {
    addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
      StanzaType.chat, "Detected spam messages: " + spamCounter,
      "Spam counter", null, newPacketId("spam-")));
    delayCounter = 0;
    spamCounter = 0;
  }
}

This method sends every 'notificationFrequency' minutes a message to 'abuseAddress' reporting how many spam messages have been detected during last period. Please note, you have to call super.everyMinute() to make sure other actions are executed as well and you have to also remember to keep processing in this method to minimum, especially if you overwrite everySecond() method.

There are also two methods which allow you to schedule tasks executed at certain time, they are very similar to the java.util.Timer API with the only difference is that Timer is reused among all levels of Class hierarchy. There is a separate Timer for each Class instance though, to avoid interferences between separate components:

addTimerTask(TimerTask task, long delay, TimeUnit unit);
addTimerTask(TimerTask task, long delay);

There is one more method which can be overwritten which is not directly related to multi-threading but might be very helpful for executing some actions at a very specific point of time. This is the point of time when the server has just been initialised, that is all components have been created and received their configuration for the first time. When this happens the Tigase calls void initializationCompleted() method for each server component. You can overwrite this method to execute some actions at the time when you are sure the the Tigase server has started and is fully functional.

And here is a code of an example component which uses all the API discussed in this article:

import java.util.Arrays;
import java.util.Map;
import java.util.logging.Logger;
import tigase.server.AbstractMessageReceiver;
import tigase.server.Packet;
import tigase.util.JIDUtils;
import tigase.xmpp.StanzaType;

public class TestComponent extends AbstractMessageReceiver {

  private static final Logger log =
    Logger.getLogger(TestComponent.class.getName());

  private static final String BAD_WORDS_KEY = "bad-words";
  private static final String WHITELIST_KEY = "white-list";
  private static final String PREPEND_TEXT_KEY = "log-prepend";
  private static final String SECURE_LOGGING_KEY = "secure-logging";
  private static final String ABUSE_ADDRESS_KEY = "abuse-address";
  private static final String NOTIFICATION_FREQ_KEY = "notification-freq";

  private String[] badWords = {"word1", "word2", "word3"};
  private String[] whiteList = {"admin@localhost"};
  private String prependText = "Spam detected: ";
  private String abuseAddress = "abuse@locahost";
  private int notificationFrequency = 10;
  private int delayCounter = 0;
  private boolean secureLogging = false;
  private long spamCounter = 0;

  @Override
  public void processPacket(Packet packet) {
    // Is this packet a message?
    if ("message" == packet.getElemName()) {
      String from = JIDUtils.getNodeID(packet.getElemFrom());
      // Is sender on the whitelist?
      if (Arrays.binarySearch(whiteList, from) < 0) {
        // The sender is not on whitelist so let's check the content
        String body = packet.getElemCData("/message/body");
        if (body != null && !body.isEmpty()) {
          body = body.toLowerCase();
          for (String word : badWords) {
            if (body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              return;
            }
          }
        }
      }
    }
    // Not a SPAM, return it for further processing
    Packet result = packet.swapFromTo();
    addOutPacket(result);
  }

  @Override
  public int processingInThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int processingOutThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int hashCodeForPacket(Packet packet) {
    if (packet.getElemTo() != null) {
      return packet.getElemTo().hashCode();
    }
    // This should not happen, every packet must have a destination
    // address, but maybe our SPAM checker is used for checking
    // strange kind of packets too....
    if (packet.getElemFrom() != null) {
      return packet.getElemFrom().hashCode();
    }
    // If this really happens on your system you should look carefully
    // at packets arriving to your component and decide a better way
    // to calculate hashCode
    return 1;
  }

  @Override
  public Map<String, Object> getDefaults(Map<String, Object> params) {
    Map<String, Object> defs = super.getDefaults(params);
    defs.put(BAD_WORDS_KEY, badWords);
    defs.put(WHITELIST_KEY, whiteList);
    defs.put(PREPEND_TEXT_KEY, prependText);
    defs.put(SECURE_LOGGING_KEY, secureLogging);
    defs.put(ABUSE_ADDRESS_KEY, abuseAddress);
    defs.put(NOTIFICATION_FREQ_KEY, notificationFrequency);
    return defs;
  }

  @Override
  public void setProperties(Map<String, Object> props) {
    super.setProperties(props);
    badWords = (String[])props.get(BAD_WORDS_KEY);
    whiteList = (String[])props.get(WHITELIST_KEY);
    Arrays.sort(whiteList);
    prependText = (String)props.get(PREPEND_TEXT_KEY);
    secureLogging = (Boolean)props.get(SECURE_LOGGING_KEY);
    abuseAddress = (String)props.get(ABUSE_ADDRESS_KEY);
    notificationFrequency = (Integer)props.get(NOTIFICATION_FREQ_KEY);
  }

  @Override
  public synchronized void everyMinute() {
    super.everyMinute();
    if ((++delayCounter) >= notificationFrequency) {
      addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
        StanzaType.chat, "Detected spam messages: " + spamCounter,
        "Spam counter", null, newPacketId("spam-")));
      delayCounter = 0;
      spamCounter = 0;
    }
  }

}