Component Implementation - Lesson 3 - Multi-Threading

Multi core and multi CPU machines very common nowadays, especially for an application like the XMPP server you most likely deployed your service on. Your new custom component however, processes all packets in a single thread.

This is especially important if the packet processing is CPU expensive like, for example, SPAM checking. In such a case you could experience single Core/CPU usage at 100% while other Cores/CPUs are idling. Ideally, you want your component to use all available CPUs.

Tigase API offers a very simple way to execute component’s processPacket(Packet packet) method in multiple threads. Methods int processingOutThreads() and int processingInThreads() returns number of threads assigned to the component. By default it returns just 1 as not all component implementations are prepared to process packets concurrently. By overwriting the method you can return any value you think is appropriate for the implementation. Please note, there are two methods, one is for a number of threads for incoming packets to the component and another for outgoing packets from the component. It used to be a single method but different components have different needs and the best performance can be achieved when the outgoing queues have a separate threads pool from incoming queues. Also some components only receive packets while other only send, therefore assigning an equal number of threads for both that could be a waste of resources.

If the packet processing is CPU bound only, you normally want to have as many threads as there are CPUs available:

@Override
public int processingInThreads() {
  return Runtime.getRuntime().availableProcessors();
}
@Override
public int processingOutThreads() {
  return Runtime.getRuntime().availableProcessors();
}

If the processing is I/O bound (network or database) you probably want to have more threads to process requests. It is hard to guess the ideal number of threads right on the first try. Instead you should run a few tests to see how many threads is best for implementation of the component.

Now you have many threads for processing your packets, but there is one slight problem with this. In many cases packet order is essential. If our processPacket(…​) method is executed concurrently by a few threads it is quite possible that a message sent to user can takeover the message sent earlier. Especially if the first message was large and the second was small. We can prevent this by adjusting the method responsible for packet distribution among threads.

The algorithm for packets distribution among threads is very simple:

int thread_idx = hashCodeForPacket(packet) % threads_total;

So the key here is using the hashCodeForPacket(…​) method. By overwriting it we can make sure that all packets addressed to the same user will always be processed by the same thread:

@Override
public int hashCodeForPacket(Packet packet) {
  if (packet.getElemTo() != null) {
    return packet.getElemTo().hashCode();
  }
  // This should not happen, every packet must have a destination
  // address, but maybe our SPAM checker is used for checking
  // strange kind of packets too....
  if (packet.getElemFrom() != null) {
    return packet.getElemFrom().hashCode();
  }
  // If this really happens on your system you should look
  // carefully at packets arriving to your component and
  // find a better way to calculate hashCode
  return 1;
}

The above two methods give control over the number of threads assigned to the packets processing in your component and to the packet distribution among threads. This is not all Tigase API has to offer in terms of multi-threading.

Sometimes you want to perform some periodic actions. You can of course create Timer instance and load it with TimerTasks. As there might be a need for this, every level of the Class hierarchy could end-up with multiple Timer (threads in fact) objects doing similar job and using resources. There are a few methods which allow you to reuse common Timer object to perform all sorts of actions.

First, you have three methods allowing your to perform some periodic actions:

public synchronized void everySecond();
public synchronized void everyMinute();
public synchronized void everyHour();

An example implementation for periodic notifications sent to some address could look like this one:

@Override
public synchronized void everyMinute() {
  super.everyMinute();
  if ((++delayCounter) >= notificationFrequency) {
    addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
      StanzaType.chat, "Detected spam messages: " + spamCounter,
      "Spam counter", null, newPacketId("spam-")));
    delayCounter = 0;
    spamCounter = 0;
  }
}

This method sends every notificationFrequency minute a message to abuseAddress reporting how many spam messages have been detected during last period. Please note, you have to call super.everyMinute() to make sure other actions are executed as well and you have to also remember to keep processing in this method to minimum, especially if you overwrite everySecond() method.

There are also two methods which allow you to schedule tasks executed at certain time, they are very similar to the java.util.Timer API with the only difference is that Timer is reused among all levels of Class hierarchy. There is a separate Timer for each Class instance though, to avoid interferences between separate components:

addTimerTask(TimerTask task, long delay, TimeUnit unit);
addTimerTask(TimerTask task, long delay);

There is one more method which can be used which is not directly related to multi-threading, but might be very helpful for executing some actions at a very specific point of time. This is the point of time when the server has just been initialized, that is all components have been created and received their configuration for the first time. When this happens Tigase calls void initializationCompleted() method for each server component. You can overwrite this method to execute some actions at the time when you are sure the Tigase has started and is fully functional.

Here is a code of an example component which uses all the API discussed in this article:

import java.util.Arrays;
import java.util.Map;
import java.util.logging.Logger;
import tigase.server.AbstractMessageReceiver;
import tigase.server.Packet;
import tigase.util.JIDUtils;
import tigase.xmpp.StanzaType;

public class TestComponent extends AbstractMessageReceiver {

  private static final Logger log =
    Logger.getLogger(TestComponent.class.getName());

  private static final String BAD_WORDS_KEY = "bad-words";
  private static final String WHITELIST_KEY = "white-list";
  private static final String PREPEND_TEXT_KEY = "log-prepend";
  private static final String SECURE_LOGGING_KEY = "secure-logging";
  private static final String ABUSE_ADDRESS_KEY = "abuse-address";
  private static final String NOTIFICATION_FREQ_KEY = "notification-freq";

  private String[] badWords = {"word1", "word2", "word3"};
  private String[] whiteList = {"admin@localhost"};
  private String prependText = "Spam detected: ";
  private String abuseAddress = "abuse@locahost";
  private int notificationFrequency = 10;
  private int delayCounter = 0;
  private boolean secureLogging = false;
  private long spamCounter = 0;

  @Override
  public void processPacket(Packet packet) {
    // Is this packet a message?
    if ("message" == packet.getElemName()) {
      String from = JIDUtils.getNodeID(packet.getElemFrom());
      // Is sender on the whitelist?
      if (Arrays.binarySearch(whiteList, from) < 0) {
        // The sender is not on whitelist so let's check the content
        String body = packet.getElemCData("/message/body");
        if (body != null && !body.isEmpty()) {
          body = body.toLowerCase();
          for (String word : badWords) {
            if (body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              return;
            }
          }
        }
      }
    }
    // Not a SPAM, return it for further processing
    Packet result = packet.swapFromTo();
    addOutPacket(result);
  }

  @Override
  public int processingInThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int processingOutThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int hashCodeForPacket(Packet packet) {
    if (packet.getElemTo() != null) {
      return packet.getElemTo().hashCode();
    }
    // This should not happen, every packet must have a destination
    // address, but maybe our SPAM checker is used for checking
    // strange kind of packets too....
    if (packet.getElemFrom() != null) {
      return packet.getElemFrom().hashCode();
    }
    // If this really happens on your system you should look carefully
    // at packets arriving to your component and decide a better way
    // to calculate hashCode
    return 1;
  }

  @Override
  public Map<String, Object> getDefaults(Map<String, Object> params) {
    Map<String, Object> defs = super.getDefaults(params);
    defs.put(BAD_WORDS_KEY, badWords);
    defs.put(WHITELIST_KEY, whiteList);
    defs.put(PREPEND_TEXT_KEY, prependText);
    defs.put(SECURE_LOGGING_KEY, secureLogging);
    defs.put(ABUSE_ADDRESS_KEY, abuseAddress);
    defs.put(NOTIFICATION_FREQ_KEY, notificationFrequency);
    return defs;
  }

  @Override
  public void setProperties(Map<String, Object> props) {
    super.setProperties(props);
    badWords = (String[])props.get(BAD_WORDS_KEY);
    whiteList = (String[])props.get(WHITELIST_KEY);
    Arrays.sort(whiteList);
    prependText = (String)props.get(PREPEND_TEXT_KEY);
    secureLogging = (Boolean)props.get(SECURE_LOGGING_KEY);
    abuseAddress = (String)props.get(ABUSE_ADDRESS_KEY);
    notificationFrequency = (Integer)props.get(NOTIFICATION_FREQ_KEY);
  }

  @Override
  public synchronized void everyMinute() {
    super.everyMinute();
    if ((++delayCounter) >= notificationFrequency) {
      addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
        StanzaType.chat, "Detected spam messages: " + spamCounter,
        "Spam counter", null, newPacketId("spam-")));
      delayCounter = 0;
      spamCounter = 0;
    }
  }

}