Multi core and multi CPU machines are very common nowadays. Your new custom component however, processes all packets in a single thread.
This is especially important if the packet processing is CPU expensive like, for example, SPAM checking. In such a case you could experience single Core/CPU usage at 100% while other Cores/CPUs are idling. Ideally, you want your component to use all available CPUs.
Tigase API offers a very simple way to execute component’s processPacket(Packet packet)
method in multiple threads. Methods int processingOutThreads()
and int processingInThreads()
returns number of threads assigned to the component. By default it returns just '1' as not all component implementations
are prepared to process packets concurrently. By overwriting the method you can return any value you think is appropriate
for the implementation. Please note, there are two methods, one is for a number of threads for incoming packets to the component
and another for outgoing packets from the component. It used to be a single method but different components have different
needs and the best performance can be achieved when the outgoing queues have a separate threads pool from incoming queues.
Also some components only receive packets while other only send, therefore assigning an equal number of threads for both that
could be a waste of resources.
Note
Due to how Kernel works you MUST avoid using variables in those methods. If you would like to have this configurable at startup
time you could simply set processing-in-threads
and processing-out-threads
in your component’s bean configuration.
If the packet processing is CPU bound only, you normally want to have as many threads as there are CPUs available:
@Override public int processingInThreads() { return Runtime.getRuntime().availableProcessors(); } @Override public int processingOutThreads() { return Runtime.getRuntime().availableProcessors(); }
If the processing is I/O bound (network or database) you probably want to have more threads to process requests. It is hard to guess the ideal number of threads right on the first try. Instead you should run a few tests to see how many threads is best for implementation of the component.
Now you have many threads for processing your packets, but there is one slight problem with this. In many cases packet order
is essential. If our processPacket(…)
method is executed concurrently by a few threads it is quite possible that a message sent to user can takeover the message
sent earlier. Especially if the first message was large and the second was small. We can prevent this by adjusting the method
responsible for packet distribution among threads.
The algorithm for packets distribution among threads is very simple:
int thread_idx = hashCodeForPacket(packet) % threads_total;
So the key here is using the hashCodeForPacket(…)
method. By overwriting it we can make sure that all packets addressed to the same user will always be processed by the same
thread:
@Override public int hashCodeForPacket(Packet packet) { if (packet.getElemTo() != null) { return packet.getElemTo().hashCode(); } // This should not happen, every packet must have a destination // address, but maybe our SPAM checker is used for checking // strange kind of packets too.... if (packet.getElemFrom() != null) { return packet.getElemFrom().hashCode(); } // If this really happens on your system you should look // carefully at packets arriving to your component and // find a better way to calculate hashCode return 1; }
The above two methods give control over the number of threads assigned to the packets processing in your component and to the packet distribution among threads. This is not all Tigase API has to offer in terms of multi-threading.
Sometimes you want to perform some periodic actions. You can of course create Timer instance and load it with TimerTasks. As there might be a need for this, every level of the Class hierarchy could end-up with multiple Timer (threads in fact) objects doing similar job and using resources. There are a few methods which allow you to reuse common Timer object to perform all sorts of actions.
First, you have three methods allowing your to perform some periodic actions:
public synchronized void everySecond(); public synchronized void everyMinute(); public synchronized void everyHour();
An example implementation for periodic notifications sent to some address could look like this one:
@Override public synchronized void everyMinute() { super.everyMinute(); if ((++delayCounter) >= notificationFrequency) { addOutPacket(Packet.getMessage(abuseAddress, getComponentId(), StanzaType.chat, "Detected spam messages: " + spamCounter, "Spam counter", null, newPacketId("spam-"))); delayCounter = 0; spamCounter = 0; } }
This method sends every notificationFrequency minute a message to abuseAddress reporting how many spam messages have been detected during last period. Please note, you have to call super.everyMinute()
to make sure other actions are executed as well and you have to also remember to keep processing in this method to minimum,
especially if you overwrite everySecond()
method.
There is also a method which allow you to schedule tasks executed at certain time, it is very similar to the java.util.Timer
API. The only difference is that we are using ScheduledExecutorService as a backend which is being reused among all levels of Class hierarchy. There is a separate ScheduledExecutorService
for each Class instance though, to avoid interferences between separate components:
addTimerTask(tigase.util.TimerTask task, long delay);
Here is a code of an example component and module which uses all the API discussed in this article:
Example component code.
public class TestComponent extends AbstractKernelBasedComponent { private static final Logger log = Logger.getLogger(TestComponent.class.getName()); @Inject private TestModule testModule; @Override public synchronized void everyMinute() { super.everyMinute(); testModule.everyMinute(); } @Override public String getComponentVersion() { String version = this.getClass().getPackage().getImplementationVersion(); return version == null ? "0.0.0" : version; } @Override public int hashCodeForPacket(Packet packet) { if (packet.getElemTo() != null) { return packet.getElemTo().hashCode(); } // This should not happen, every packet must have a destination // address, but maybe our SPAM checker is used for checking // strange kind of packets too.... if (packet.getElemFrom() != null) { return packet.getElemFrom().hashCode(); } // If this really happens on your system you should look carefully // at packets arriving to your component and decide a better way // to calculate hashCode return 1; } @Override public boolean isDiscoNonAdmin() { return false; } @Override public int processingInThreads() { return Runtime.getRuntime().availableProcessors(); } @Override public int processingOutThreads() { return Runtime.getRuntime().availableProcessors(); } @Override protected void registerModules(Kernel kernel) { // here we need to register modules responsible for processing packets } }
Example module code.
@Bean(name = "test-module", parent = TestComponent.class, active = true) public static class TestModule extends AbstractModule { private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName()); private Criteria CRITERIA = ElementCriteria.name("message"); @ConfigField(desc = "Bad words", alias = "bad-words") private String[] badWords = {"word1", "word2", "word3"}; @ConfigField(desc = "White listed addresses", alias = "white-list") private String[] whiteList = {"admin@localhost"}; @ConfigField(desc = "Logged packet types", alias = "packet-types") private String[] packetTypes = {"message", "presence", "iq"}; @ConfigField(desc = "Prefix", alias = "log-prepend") private String prependText = "Spam detected: "; @ConfigField(desc = "Secure logging", alias = "secure-logging") private boolean secureLogging = false; @ConfigField(desc = "Abuse notification address", alias = "abuse-address") private JID abuseAddress = JID.jidInstanceNS("abuse@locahost"); @ConfigField(desc = "Frequency of notification", alias = "notification-frequency") private int notificationFrequency = 10; private int delayCounter = 0; private long spamCounter = 0; @Inject private TestComponent component; public void everyMinute() { if ((++delayCounter) >= notificationFrequency) { write(Message.getMessage(abuseAddress, component.getComponentId(), StanzaType.chat, "Detected spam messages: " + spamCounter, "Spam counter", null, component.newPacketId("spam-"))); delayCounter = 0; spamCounter = 0; } } @Override public Criteria getModuleCriteria() { return CRITERIA; } public void setPacketTypes(String[] packetTypes) { this.packetTypes = packetTypes; Criteria crit = new Or(); for (String packetType : packetTypes) { crit.add(ElementCriteria.name(packetType)); } CRITERIA = crit; } @Override public void process(Packet packet) throws ComponentException, TigaseStringprepException { // Is this packet a message? if ("message" == packet.getElemName()) { String from = packet.getStanzaFrom().toString(); // Is sender on the whitelist? if (Arrays.binarySearch(whiteList, from) < 0) { // The sender is not on whitelist so let's check the content String body = packet.getElemCDataStaticStr(Message.MESSAGE_BODY_PATH); if (body != null && !body.isEmpty()) { body = body.toLowerCase(); for (String word : badWords) { if (body.contains(word)) { log.finest(prependText + packet.toString(secureLogging)); ++spamCounter; return; } } } } } // Not a SPAM, return it for further processing Packet result = packet.swapFromTo(); write(result); } }