Component Implementation - Lesson 3 - Multi-Threading

Multi core and multi CPU machines are very common nowadays. Your new custom component however, processes all packets in a single thread.

This is especially important if the packet processing is CPU expensive like, for example, SPAM checking. In such a case you could experience single Core/CPU usage at 100% while other Cores/CPUs are idling. Ideally, you want your component to use all available CPUs.

Tigase API offers a very simple way to execute component’s processPacket(Packet packet) method in multiple threads. Methods int processingOutThreads() and int processingInThreads() returns number of threads assigned to the component. By default it returns just '1' as not all component implementations are prepared to process packets concurrently. By overwriting the method you can return any value you think is appropriate for the implementation. Please note, there are two methods, one is for a number of threads for incoming packets to the component and another for outgoing packets from the component. It used to be a single method but different components have different needs and the best performance can be achieved when the outgoing queues have a separate threads pool from incoming queues. Also some components only receive packets while other only send, therefore assigning an equal number of threads for both that could be a waste of resources.

Note

Due to how Kernel works you MUST avoid using variables in those methods. If you would like to have this configurable at startup time you could simply set processing-in-threads and processing-out-threads in your component’s bean configuration.

If the packet processing is CPU bound only, you normally want to have as many threads as there are CPUs available:

@Override
public int processingInThreads() {
  return Runtime.getRuntime().availableProcessors();
}
@Override
public int processingOutThreads() {
  return Runtime.getRuntime().availableProcessors();
}

If the processing is I/O bound (network or database) you probably want to have more threads to process requests. It is hard to guess the ideal number of threads right on the first try. Instead you should run a few tests to see how many threads is best for implementation of the component.

Now you have many threads for processing your packets, but there is one slight problem with this. In many cases packet order is essential. If our processPacket(…​) method is executed concurrently by a few threads it is quite possible that a message sent to user can takeover the message sent earlier. Especially if the first message was large and the second was small. We can prevent this by adjusting the method responsible for packet distribution among threads.

The algorithm for packets distribution among threads is very simple:

int thread_idx = hashCodeForPacket(packet) % threads_total;

So the key here is using the hashCodeForPacket(…​) method. By overwriting it we can make sure that all packets addressed to the same user will always be processed by the same thread:

@Override
public int hashCodeForPacket(Packet packet) {
  if (packet.getElemTo() != null) {
    return packet.getElemTo().hashCode();
  }
  // This should not happen, every packet must have a destination
  // address, but maybe our SPAM checker is used for checking
  // strange kind of packets too....
  if (packet.getElemFrom() != null) {
    return packet.getElemFrom().hashCode();
  }
  // If this really happens on your system you should look
  // carefully at packets arriving to your component and
  // find a better way to calculate hashCode
  return 1;
}

The above two methods give control over the number of threads assigned to the packets processing in your component and to the packet distribution among threads. This is not all Tigase API has to offer in terms of multi-threading.

Sometimes you want to perform some periodic actions. You can of course create Timer instance and load it with TimerTasks. As there might be a need for this, every level of the Class hierarchy could end-up with multiple Timer (threads in fact) objects doing similar job and using resources. There are a few methods which allow you to reuse common Timer object to perform all sorts of actions.

First, you have three methods allowing your to perform some periodic actions:

public synchronized void everySecond();
public synchronized void everyMinute();
public synchronized void everyHour();

An example implementation for periodic notifications sent to some address could look like this one:

@Override
public synchronized void everyMinute() {
  super.everyMinute();
  if ((++delayCounter) >= notificationFrequency) {
    addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
      StanzaType.chat, "Detected spam messages: " + spamCounter,
      "Spam counter", null, newPacketId("spam-")));
    delayCounter = 0;
    spamCounter = 0;
  }
}

This method sends every notificationFrequency minute a message to abuseAddress reporting how many spam messages have been detected during last period. Please note, you have to call super.everyMinute() to make sure other actions are executed as well and you have to also remember to keep processing in this method to minimum, especially if you overwrite everySecond() method.

There is also a method which allow you to schedule tasks executed at certain time, it is very similar to the java.util.Timer API. The only difference is that we are using ScheduledExecutorService as a backend which is being reused among all levels of Class hierarchy. There is a separate ScheduledExecutorService for each Class instance though, to avoid interferences between separate components:

addTimerTask(tigase.util.TimerTask task, long delay);

Here is a code of an example component and module which uses all the API discussed in this article:

Example component code. 

public class TestComponent extends AbstractKernelBasedComponent {

  private static final Logger log = Logger.getLogger(TestComponent.class.getName());

  @Inject
  private TestModule testModule;

  @Override
  public synchronized void everyMinute() {
    super.everyMinute();
    testModule.everyMinute();
  }

  @Override
  public String getComponentVersion() {
  String version = this.getClass().getPackage().getImplementationVersion();
    return version == null ? "0.0.0" : version;
  }

  @Override
  public int hashCodeForPacket(Packet packet) {
    if (packet.getElemTo() != null) {
      return packet.getElemTo().hashCode();
    }
    // This should not happen, every packet must have a destination
    // address, but maybe our SPAM checker is used for checking
    // strange kind of packets too....
    if (packet.getElemFrom() != null) {
      return packet.getElemFrom().hashCode();
    }
    // If this really happens on your system you should look carefully
    // at packets arriving to your component and decide a better way
    // to calculate hashCode
    return 1;
  }

  @Override
  public boolean isDiscoNonAdmin() {
    return false;
  }

  @Override
  public int processingInThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int processingOutThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  protected void registerModules(Kernel kernel) {
    // here we need to register modules responsible for processing packets
  }

}

Example module code. 

@Bean(name = "test-module", parent = TestComponent.class, active = true)
public static class TestModule extends AbstractModule {

  private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName());

  private Criteria CRITERIA = ElementCriteria.name("message");

  @ConfigField(desc = "Bad words", alias = "bad-words")
  private String[] badWords = {"word1", "word2", "word3"};
  @ConfigField(desc = "White listed addresses", alias = "white-list")
  private String[] whiteList = {"admin@localhost"};
  @ConfigField(desc = "Logged packet types", alias = "packet-types")
  private String[] packetTypes = {"message", "presence", "iq"};
  @ConfigField(desc = "Prefix", alias = "log-prepend")
  private String prependText = "Spam detected: ";
  @ConfigField(desc = "Secure logging", alias = "secure-logging")
  private boolean secureLogging = false;
  @ConfigField(desc = "Abuse notification address", alias = "abuse-address")
  private JID abuseAddress = JID.jidInstanceNS("abuse@locahost");
  @ConfigField(desc = "Frequency of notification", alias = "notification-frequency")
  private int notificationFrequency = 10;
  private int delayCounter = 0;
  private long spamCounter = 0;

  @Inject
  private TestComponent component;

  public void everyMinute() {
    if ((++delayCounter) >= notificationFrequency) {
      write(Message.getMessage(abuseAddress, component.getComponentId(), StanzaType.chat,
                               "Detected spam messages: " + spamCounter, "Spam counter", null,
                               component.newPacketId("spam-")));
      delayCounter = 0;
      spamCounter = 0;
    }
  }

  @Override
  public Criteria getModuleCriteria() {
    return CRITERIA;
  }

  public void setPacketTypes(String[] packetTypes) {
    this.packetTypes = packetTypes;
    Criteria crit = new Or();
    for (String packetType : packetTypes) {
      crit.add(ElementCriteria.name(packetType));
    }
    CRITERIA = crit;
  }

  @Override
  public void process(Packet packet) throws ComponentException, TigaseStringprepException {
    // Is this packet a message?
    if ("message" == packet.getElemName()) {
      String from = packet.getStanzaFrom().toString();
      // Is sender on the whitelist?
      if (Arrays.binarySearch(whiteList, from) < 0) {
        // The sender is not on whitelist so let's check the content
        String body = packet.getElemCDataStaticStr(Message.MESSAGE_BODY_PATH);
        if (body != null && !body.isEmpty()) {
          body = body.toLowerCase();
          for (String word : badWords) {
            if (body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              return;
            }
          }
        }
      }
    }
    // Not a SPAM, return it for further processing
    Packet result = packet.swapFromTo();
    write(result);
  }
}