7. 组件开发

Tigase中的组件是具有自己的JID地址的实体。它可以接收数据包,处理它们,也可以生成数据包。

最著名的组件的一个示例是MUC或PubSub。然而,在Tigase中,几乎所有东西实际上都是一个组件:会话管理器、s2s 连接管理器、消息路由器等……组件是根据服务器配置加载的,新组件可以在运行时加载和激活。您可以轻松替换组件实现,唯一需要更改的是配置条目中的类名。

为Tigase服务器创建组件是服务器开发的重要组成部分,因此有很多有用的API和现成可用的代码。本指南将帮助您熟悉API 以及如何快速有效地创建自己的组件实现。

  1. 组件实现 - 第 1 课 - 基础

  2. 组件实现 - 第 2 课 - 配置

  3. 组件实现 - 第 3 课 - 多线程

  4. 组件实现 - 第 4 课 - 服务发现

  5. 组件实现 - 第 5 课 - 统计

  6. 组件实现 - 第 6 课 - 脚本支持

  7. 组件实现 - 第 7 课 - 数据存储库

  8. 组件实现 - 第 8 课 - 启动时间

  9. 组件中的包过滤

7.1. 组件实现 - 第 1 课 - 基础

创建Tigase组件实际上非常简单,并且可以使用广泛的API,您只需几行代码即可创建功能强大的组件。您可以在其他地方找到详细的API描述。本系列通过代码示例提供实践课程,教授如何使用现有的Tigase API以最简单的代码获得所需的结果。

即使所有Tigase组件只是 ServerComponent 接口的实现,我也会将这种低级别信息保持在必要的最低限度。仅基于接口创建新组件虽然很有可能,但不是很有效。本指南旨在教您如何利用已有的内容,只需最少的编码工作即可使用。

这只是本系列的第一课,我将介绍组件实现的基础知识。

让我们开始创建Tigase组件:

import java.util.logging.Logger;
import tigase.component.AbstractKernelBasedComponent;
import tigase.server.Packet;

public class TestComponent extends AbstractKernelBasedComponent {

  private static final Logger log = Logger.getLogger(TestComponent.class.getName());

  @Override
  public String getComponentVersion() {
    String version = this.getClass().getPackage().getImplementationVersion();
    return version == null ? "0.0.0" : version;
  }

  @Override
  public boolean isDiscoNonAdmin() {
    return false;
  }

  @Override
  protected void registerModules(Kernel kernel) {
    // here we need to register modules responsible for processing packets
  }

}

如您所见,当我们扩展 AbstractKernelBasedComponent 时,我们有3个强制方法:

  • String getComponentVersion() 返回组件的版本以用于日志记录

  • boolean isDiscoNonAdmin() 决定组件是否对服务器管理员以外的用户可见

  • void registerModules(Kernel kernel) 允许您注册负责实际处理数据包的组件模块

小技巧

如果您决定不想使用模块来处理数据包(尽管我们强烈建议使用它们,因为模块组件易于扩展)您可以实现另一种方法 void processPacket(Packet packet) 这将为每个发送到组件的数据包调用。这种方法实际上是合乎逻辑的,因为您的组件的主要任务是处理数据包。

我们的新组件的类名是 TestComponent,我们还为这个类初始化了一个单独的记录器。这样做非常有用,因为它可以让我们轻松找到我们的类创建的日志条目。

使用这几行代码,您就拥有了一个功能齐全的Tigase组件,可以将其加载到Tigase服务器;它可以接收和处理数据包,显示为服务发现列表中的一个元素(仅适用于管理员),响应管理员临时命令,支持脚本,生成统计信息,可以部署为外部组件,以及其他一些事情。

下一个重要步骤是创建负责处理数据包的模块。现在让我们通过将消息附加到日志文件来创建负责处理消息的模块:

@Bean(name = "test-module", parent = TestComponent.class, active = true)
public static class TestModule extends AbstractModule {

  private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName());

  private static final Criteria CRITERIA = ElementCriteria.name("message");

  @Override
  public Criteria getModuleCriteria() {
    return CRITERIA;
  }

  @Override
  public void process(Packet packet) throws ComponentException, TigaseStringprepException {
    log.finest("My packet: " + packet.toString());
  }
}

Criteria getModuleCriteria() 返回的 Criteria 类的实例被组件类用来决定包是否应该被这个模块处理。在这种情况下,我们返回的实例匹配任何作为 消息 的数据包。

最后我们有一个非常重要的方法 void process(Packet packet),它是组件的主要处理方法。如果组件将接收到与模块返回的标准匹配的数据包 - 将调用此方法。

但是我们如何从模块发送数据包呢? AbstractModule 包含方法 void write(Packet packet),您可以使用该方法从组件发送数据包。

在我们进一步实现之前,让我们在Tigase服务器中配置组件,以便在下次服务器启动时加载它。假设我们的 init.tdsl 文件看起来像这样:

'config-type' = 'default'
'debug' = ['server']
'default-virtual-host' = [ 'devel.tigase.org' ]
admins = [ '[email protected]' ]
dataSource {
  default () {
    uri = 'jdbc:derby:/Tigase/tigasedb'
  }
}
muc() {}
pubsub() {}

我们可以看到它已经配置为加载另外两个组件:MUCPubSub。让我们通过在属性文件中附加以下行来添加第三个 - 我们的新组件到配置文件:

test(class: TestComponent) {}

现在我们必须重新启动服务器。

有几种方法可以检查我们的组件是否已加载到服务器。可能最简单的方法是从管理员帐户连接到服务器并查看服务发现列表。

service disco test comp admin 300

如果一切顺利,您应该会在列表中看到与屏幕截图中突出显示的条目类似的条目。组件描述是’’Undefined description’’,这是一个默认描述,我们可以稍后更改它,组件默认JID是:test@devel.tigase.org,其中 devel.tigase.org 是服务器域,test是组件名称。

确定组件是否已加载的另一种方法是查看日志文件。如果您计划开发Tigase组件,那么熟悉Tigase日志文件将非常有用。因此,让我们查看日志文件 logs/tigase.log.0,如果组件已加载,您应该在日志中找到以下行:

MessageRouter.setProperties() FINER: Loading and registering message receiver: test
MessageRouter.addRouter() INFO: Adding receiver: TestComponent
MessageRouter.addComponent() INFO: Adding component: TestComponent

如果您的组件没有加载,您应该首先检查配置文件。也许Tigase在启动时找不到您的类。确保您的类位于 CLASSPATH 中,或者将您的课程的JAR文件复制到Tigase jars/ 目录。

假设一切顺利并且您的组件已被服务器加载并且它显示在服务发现列表中,如上面的屏幕截图所示,您可以双击它以获取一个包含临时命令列表的窗口 - 管理员脚本。屏幕截图上的一个窗口仅显示了用于添加和删除脚本的两个基本命令,这是一个好的开始。

commands list test 200

此外,您可以在服务发现窗口中浏览服务器统计信息,以在列表中找到您的新测试组件。如果您单击组件,它会显示一个包含组件统计信息的窗口,非常基本的数据包计数器。

service disco stats 200

正如我们只需要几行代码就可以看到的,我们的新组件非常强大,可以在开发人员方面不付出太多努力的情况下做很多事情。

现在,到了最重要的问题的时候了。我们的新组件能否做一些有用的事情,即它可以接收和处理XMPP数据包吗?

让我们试试看。使用您最喜欢的客户端向JID发送消息:test@devel.tigase.org (假设您的服务器配置为 devel.tigase.org 域)。您可以在客户端中使用某种XML控制台,也可以只向组件JID发送一条简单的消息。根据我们在 process(…​) 方法中的代码,它应该记录我们的消息。对于这个测试,我发送了一条主题为:”test message”和正文:”this is a test”的消息。日志文件应包含以下条目:

TestModule.process() FINEST: My packet: to=null, from=null,
data=<message from="[email protected]/devel"
  to="[email protected]" id="abcaa" xmlns="jabber:client">
  <subject>test message</subject>
  <body>this is a test</body>
</message>, XMLNS=jabber:client, priority=NORMAL

如果是这种情况,我们可以确定一切都按预期工作,我们现在要做的就是用一些有用的代码填充 process(…​) 方法。

7.2. 组件实现 - 第 2 课 - 配置

可能很难说出您应该对新组件实现做的第一件事是什么。不同的开发者可能对此有不同的看法。然而,在我看来,为您的组件提供一种配置它并提供一些运行时设置的方法总是一个好主意。

本指南描述了如何向您的组件添加配置处理。

为了演示如何实现组件配置,假设我们要配置组件将记录哪些类型的数据包。有三种可能的数据包类型:messagepresenceiq,我们希望能够配置这三者的任意组合的日志记录。此外,我们还希望能够配置附加到记录消息的文本,并可选择切换安全登录。 (安全日志将所有数据包CData替换为文本:CData 大小:NN 以保护用户隐私。)

让我们在我们的组件 TestModule 中创建以下私有变量:

private String[] packetTypes = {"message", "presence", "iq"};
private String prependText = "My packet: ";
private boolean secureLogging = false;

为了使它们可配置,我们必须使用 @ConfigField 注释来注释它们。它需要 desc 字段(描述配置选项)并具有以下可选属性: * alias - 变量的替代名称 * allowAliasFromParent - 指定来自父bean的别名的值是否应该是allowed * type - 指定字段的一般类型,这会影响日志/”config-dump”文件中值的可能混淆;可能的值: Plain (不更改)、Password (完整的值将被混淆)和 JdbcUrl (只有密码部分将从URL中混淆)

@ConfigField(desc = "Logged packet types", alias = "packet-types")
private String[] packetTypes = {"message", "presence", "iq"};
@ConfigField(desc = "Prefix", alias = "log-prepend")
private String prependText = "My packet: ";
@ConfigField(desc = "Secure logging", alias = "secure-logging")
private boolean secureLogging = false;

就是这样。 Tigase内核将处理这些字段,并在配置更改时更新它们。

config.tdsl 文件中的语法非常简单,在 Admin Guide 中有详细描述。要在 config.tdsl 文件中为您的组件设置配置,您必须将以下行附加到测试组件配置块内的文件中:

test-module {
  log-prepend = 'My packet: '
  packet-types = [ 'message', 'presence', 'iq' ]
  secure-logging = true
}

方括号用于标记我们设置了一个由几个元素组成的列表,请查看 Admin Guide 文档以获取更多详细信息。

这是新组件模块的完整代码,它利用配置设置修改了 process(...​) 方法:

@Bean(name = "test-module", parent = TestComponent.class, active = true)
public static class TestModule extends AbstractModule {

  private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName());

  private Criteria CRITERIA = ElementCriteria.name("message");

  @ConfigField(desc = "Logged packet types", alias = "packet-types")
  private String[] packetTypes = {"message", "presence", "iq"};
  @ConfigField(desc = "Prefix", alias = "log-prepend")
  private String prependText = "My packet: ";
  @ConfigField(desc = "Secure logging", alias = "secure-logging")
  private boolean secureLogging = false;

  @Override
  public Criteria getModuleCriteria() {
    return CRITERIA;
  }

  public void setPacketTypes(String[] packetTypes) {
    this.packetTypes = packetTypes;
    Criteria crit = new Or();
    for (String packetType : packetTypes) {
      crit.add(ElementCriteria.name(packetType));
    }
    CRITERIA = crit;
  }

  @Override
  public void process(Packet packet) throws ComponentException, TigaseStringprepException {
    log.finest(prependText + packet.toString(secureLogging));
  }
}

当然,我们可以在 process(...​) 方法中进行更有用的数据包处理。这只是一个示例代码。

小技巧

这里我们使用了一个setter setPacketType(String[] packetTypes), 它是一个字段 packetTypes 的setter。 Tigase内核将使用它而不是直接将值分配给一个字段,这放弃了将值转换为不同类型并更新其他字段的机会 - 在我们的例子中,我们更新了 CRITERIA 字段,这将导致数据包类型发生变化方法 void process(…​) 将被调用。

7.3. 组件实现 - 第 3 课 - 多线程

如今,多核和多CPU机器非常普遍。但是,您的新自定义组件在单个线程中处理所有数据包。

如果数据包处理占用大量CPU资源(例如垃圾邮件检查),这一点尤其重要。在这种情况下,您可能会遇到100%的单个核心/CPU使用率,而其他核心/CPU处于空闲状态。理想情况下,您希望您的组件使用所有可用的CPU。

Tigase API提供了一种非常简单的方法来在多个线程中执行组件的 processPacket(Packet packet) 方法。方法 int processingOutThreads()int processingInThreads() 返回分配给组件的线程数。默认情况下,它只返回 ‘1’,因为并非所有组件实现都准备好同时处理数据包。通过覆盖该方法,您可以返回您认为适合实现的任何值。请注意,有两种方法,一种是用于将数据包传入组件的多个线程,另一种是用于从组件传出的数据包。它曾经是一种单一的方法,但不同的组件有不同的需求,当传出队列与传入队列有一个单独的线程池时,可以实现最佳性能。此外,一些组件只接收数据包,而其他组件只发送,因此为两者分配相同数量的线程可能会浪费资源。

备注

由于内核的工作方式,您必须避免在这些方法中使用变量。如果你想在启动时进行配置,你可以简单地在组件的bean配置中设置 processing-in-threadsprocessing-out-threads

如果数据包处理仅受CPU限制,您通常希望拥有与可用CPU一样多的线程:

@Override
public int processingInThreads() {
  return Runtime.getRuntime().availableProcessors();
}
@Override
public int processingOutThreads() {
  return Runtime.getRuntime().availableProcessors();
}

如果处理受I/O限制(网络或数据库),您可能希望有更多线程来处理请求。在第一次尝试时很难猜出理想的线程数。相反,您应该运行一些测试来查看有多少线程最适合组件的实现。

现在你有很多线程来处理你的数据包,但是这有一个小问题。在许多情况下,数据包顺序是必不可少的。如果我们的 processPacket(...​) 方法由几个线程并发执行,那么发送给用户的消息很可能会接管之前发送的消息。特别是如果第一条消息很大而第二条消息很小。我们可以通过调整负责在线程之间分配数据包的方法来防止这种情况。

线程间数据包分发的算法非常简单:

int thread_idx = hashCodeForPacket(packet) % threads_total;

所以这里的关键是使用 hashCodeForPacket(...​) 方法。通过覆盖它,我们可以确保发送给同一用户的所有数据包将始终由同一线程处理:

@Override
public int hashCodeForPacket(Packet packet) {
  if (packet.getElemTo() != null) {
    return packet.getElemTo().hashCode();
  }
  // This should not happen, every packet must have a destination
  // address, but maybe our SPAM checker is used for checking
  // strange kind of packets too....
  if (packet.getStanzaFrom() != null) {
    return packet.getStanzaFrom().hashCode();
  }
  // If this really happens on your system you should look
  // carefully at packets arriving to your component and
  // find a better way to calculate hashCode
  return 1;
}

上述两种方法可以控制分配给组件中数据包处理的线程数以及线程之间的数据包分配。这并不是Tigase API在多线程方面必须提供的全部。

有时你想执行一些周期性的动作。您当然可以创建Timer实例并使用TimerTasks加载它。由于可能需要这样做,因此类层次结构的每一级都可能以多个 Timer(实际上是线程)对象完成类似的工作并使用资源而告终。有一些方法允许您重用通用 Timer对象来执行各种操作。

首先,您有三种方法可以让您执行一些周期性操作:

public synchronized void everySecond();
public synchronized void everyMinute();
public synchronized void everyHour();

发送到某个地址的定期通知的示例实现可能如下所示:

@Override
public synchronized void everyMinute() {
  super.everyMinute();
  if ((++delayCounter) >= notificationFrequency) {
    addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
      StanzaType.chat, "Detected spam messages: " + spamCounter,
      "Spam counter", null, newPacketId("spam-")));
    delayCounter = 0;
    spamCounter = 0;
  }
}

此方法每 notificationFrequency 分钟向 abuseAddress 发送一条消息,报告在最后一段时间内检测到多少垃圾邮件。请注意,您必须调用 super.everyMinute() 以确保其他操作也被执行,并且您还必须记住将此方法中的处理保持在最低限度,特别是如果您覆盖 everySecond() 方法。

还有一种方法可以让您安排在特定时间执行的任务,它与 java.util.Timer API 非常相似。唯一的区别是我们使用 ScheduledExecutorService 作为后端,它在类层次结构的所有级别中被重用。但是,每个Class实例都有一个单独的 ScheduledExecutorService,以避免不同组件之间的干扰:

addTimerTask(tigase.util.TimerTask task, long delay);

以下是使用本文讨论的所有API的示例组件和模块的代码:

示例组件代码。

public class TestComponent extends AbstractKernelBasedComponent {

  private static final Logger log = Logger.getLogger(TestComponent.class.getName());

  @Inject
  private TestModule testModule;

  @Override
  public synchronized void everyMinute() {
    super.everyMinute();
    testModule.everyMinute();
  }

  @Override
  public String getComponentVersion() {
  String version = this.getClass().getPackage().getImplementationVersion();
    return version == null ? "0.0.0" : version;
  }

  @Override
  public int hashCodeForPacket(Packet packet) {
    if (packet.getElemTo() != null) {
      return packet.getElemTo().hashCode();
    }
    // This should not happen, every packet must have a destination
    // address, but maybe our SPAM checker is used for checking
    // strange kind of packets too....
    if (packet.getStanzaFrom() != null) {
      return packet.getStanzaFrom().hashCode();
    }
    // If this really happens on your system you should look carefully
    // at packets arriving to your component and decide a better way
    // to calculate hashCode
    return 1;
  }

  @Override
  public boolean isDiscoNonAdmin() {
    return false;
  }

  @Override
  public int processingInThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int processingOutThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  protected void registerModules(Kernel kernel) {
    // here we need to register modules responsible for processing packets
  }

}

示例模块代码。

@Bean(name = "test-module", parent = TestComponent.class, active = true)
public static class TestModule extends AbstractModule {

  private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName());

  private Criteria CRITERIA = ElementCriteria.name("message");

  @ConfigField(desc = "Bad words", alias = "bad-words")
  private String[] badWords = {"word1", "word2", "word3"};
  @ConfigField(desc = "White listed addresses", alias = "white-list")
  private String[] whiteList = {"admin@localhost"};
  @ConfigField(desc = "Logged packet types", alias = "packet-types")
  private String[] packetTypes = {"message", "presence", "iq"};
  @ConfigField(desc = "Prefix", alias = "log-prepend")
  private String prependText = "Spam detected: ";
  @ConfigField(desc = "Secure logging", alias = "secure-logging")
  private boolean secureLogging = false;
  @ConfigField(desc = "Abuse notification address", alias = "abuse-address")
  private JID abuseAddress = JID.jidInstanceNS("abuse@locahost");
  @ConfigField(desc = "Frequency of notification", alias = "notification-frequency")
  private int notificationFrequency = 10;
  private int delayCounter = 0;
  private long spamCounter = 0;

  @Inject
  private TestComponent component;

  public void everyMinute() {
    if ((++delayCounter) >= notificationFrequency) {
      write(Message.getMessage(abuseAddress, component.getComponentId(), StanzaType.chat,
                               "Detected spam messages: " + spamCounter, "Spam counter", null,
                               component.newPacketId("spam-")));
      delayCounter = 0;
      spamCounter = 0;
    }
  }

  @Override
  public Criteria getModuleCriteria() {
    return CRITERIA;
  }

  public void setPacketTypes(String[] packetTypes) {
    this.packetTypes = packetTypes;
    Criteria crit = new Or();
    for (String packetType : packetTypes) {
      crit.add(ElementCriteria.name(packetType));
    }
    CRITERIA = crit;
  }

  @Override
  public void process(Packet packet) throws ComponentException, TigaseStringprepException {
    // Is this packet a message?
    if ("message" == packet.getElemName()) {
      String from = packet.getStanzaFrom().toString();
      // Is sender on the whitelist?
      if (Arrays.binarySearch(whiteList, from) < 0) {
        // The sender is not on whitelist so let's check the content
        String body = packet.getElemCDataStaticStr(Message.MESSAGE_BODY_PATH);
        if (body != null && !body.isEmpty()) {
          body = body.toLowerCase();
          for (String word : badWords) {
            if (body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              return;
            }
          }
        }
      }
    }
    // Not a SPAM, return it for further processing
    Packet result = packet.swapFromTo();
    write(result);
  }
}

7.4. 组件实现 - 第 4 课 - 服务发现

您的组件仍然在服务发现列表中显示为带有 “Undefined description” 的元素。它也没有提供任何有趣的功能或子节点。

在本文中,我将展示如何以简单的方式更改服务发现列表中显示的基本组件信息,以及如何添加一些服务disco功能。作为一个更高级的功能,该指南将教您在运行时添加/删除服务发现节点以及更新现有元素。

为了让组件正确响应 disco#infodisco#items 请求,你应该在你的组件中注册 DiscoveryModule

@Override
protected void registerModules(Kernel kernel) {
    kernel.registerBean("disco").asClass(DiscoveryModule.class).exec();
}

注意在你的组件中 显式 注册 DiscoveryModule 是很重要的。

可以通过覆盖以下两个方法来更改组件描述和类别类型:

@Override
public String getDiscoDescription() {
  return "Spam filtering";
}

@Override
public String getDiscoCategoryType() {
  return "spam";
}

请注意,Service Discovery Identities registry 中没有定义这样的 ‘spam’ 类别类型。此处仅用作演示。请参阅Service Discovery Identities注册表文档以获取类别和类型的列表,然后选择最适合您的。

添加上述两种方法并使用更新的代码重新启动服务器后,请查看服务发现窗口。您应该会在屏幕截图上看到类似的内容。

spam filtering disco small

现在让我们添加允许我们的模块 TestModule 返回支持的功能的方法。这样,我们的组件将自动报告其所有模块支持的功能。为此,我们需要实现一个方法 String[] getFeatures(),它返回 String 项的数组。此项用于生成组件支持的功能列表。

虽然这很容易,但这种特殊的变化除了视觉外观之外不会影响任何东西。然后让我们进行更高级和更有用的更改。

上述方法的限制之一是您无法使用这些方法在运行时更新或更改组件信息。它们在组件初始化期间仅在创建组件服务发现信息并准备以后使用时调用一次。但是,有时能够在运行时更改服务发现很有用。

在我们简单的垃圾邮件过滤组件中,让我们显示有多少消息已作为服务发现描述字符串的一部分被检出。每次我们收到消息时,我们都可以调用:

updateServiceDiscoveryItem(getName(), null, getDiscoDescription() + ": [" + (++messagesCounter) + "]", true);

一个小的性能说明,在某些情况下调用 updateServiceDiscoveryItem(...​) 可能是一个昂贵的操作,所以一个更好的主意可能不是每次我们收到消息时调用该方法,而是每100次左右调用一次。

第一个参数是服务发现列表上显示的组件 JID。但是,Tigase服务器可能适用于许多虚拟主机,因此主机名部分由较低级别的函数添加,我们在这里只提供组件名称。第二个参数是服务发现节点,对于顶级disco元素,它通常是’null’ 。第三是项目描述(在disco规范中实际上称为 ‘name’)。最后一个参数指定元素是否仅对管理员可见。

spam filter counter small

完整的方法代码如下所示,上面的屏幕截图显示了如果我们应用我们的代码并向组件发送一些消息,我们的组件的服务发现元素会如何变化。

使用该方法,我们还可以将子模式添加到我们的组件元素中。 XMPP服务发现确实不是用于显示应用程序计数器,但本例足以演示Tigase中可用的API,因此我们继续通过服务发现显示我们的计数器。这一次,我们不使用 ‘null’作为节点,而是放置一些有意义的文本,如下例所示:

// This is called whenever a message arrives
// to the component
updateServiceDiscoveryItem(getName(), "messages",
  "Messages processed: [" + (++messagesCounter) + "]", true);
// This is called every time the component detects
// spam message
updateServiceDiscoveryItem(getName(), "spam", "Spam caught: [" +
  (++totalSpamCounter) + "]", true);

同样,请查看下面的完整方法主体以获取完整的代码示例。现在,如果我们向组件发送一些消息,其中一些是垃圾邮件(包含被识别为垃圾邮件的单词),我们可以浏览服务器的服务发现。您的服务发现应该显示一个类似于左侧屏幕截图中的列表。

当然,取决于实现,如果我们仅在处理消息时调用 updateServiceDiscoveryItem(…​) 方法,最初可能在我们的组件元素下没有子节点。为了确保我们组件的子节点从一开始就显示出来,您可以第一次在 setProperties(…​) 中调用它们,以使用初始子节点填充服务发现。

请注意,updateServiceDiscoveryItem(...​) 方法用于添加新项目和更新现有项目。虽然有一个单独的方法可以删除该项目:

void removeServiceDiscoveryItem(String jid,
  String node, String description)

实际上只有两个第一个参数很重要:jidnode 必须对应于现有的、先前创建的服务发现项。

update 方法有两个额外的变体,可以让您更好地控制所创建的服务发现项。项目可以是不同的类别和类型,也可以呈现一组特征。

更简单的是为更新的服务发现项设置一组特征的变体。有一个 文档 描述了现有的注册功能。我们正在创建一个将成为垃圾邮件过滤器的示例,并且没有预定义的垃圾邮件过滤功能,但出于本指南的目的,我们可以发明两个特征识别字符串并将其设置为我们的组件。让我们使用以下参数调用 update 方法:

updateServiceDiscoveryItem(getName(), null, getDiscoDescription(),
  true, "tigase:x:spam-filter", "tigase:x:spam-reporting");

调用此方法的最佳位置是 setProperties(...​) 方法,以便我们的组件在启动时获得正确的服务发现设置。我们为组件disco设置了两个功能:tigase:x:spam-filtertigase:x:spam-reporting。此方法接受一组可变参数,所以我们可以根据需要传递给它尽可能多的特性,或者按照Java规范,我们可以只传递一个 Strings 数组。

使用上述调用更新您的代码,然后重新启动服务器。现在看看组件的服务发现。

最后一个功能对于我们的垃圾邮件过滤组件的情况可能不是很有用,但对于许多其他情况,例如MUC或PubSub,它正在为服务发现项设置适当的类别和类型。有一个文档列出了所有当前注册的服务发现身份(类别和类型)。还有垃圾邮件过滤条目。让我们使用 automation 类别和 spam-filter 类型并为我们的组件设置它:

updateServiceDiscoveryItem(getName(), null, getDiscoDescription(),
  "automation", "spam-filtering", true,
  "tigase:x:spam-filter", "tigase:x:spam-reporting");

当然,所有这些设置都可以应用于任何服务发现创建或更新,包括子节点。这是组件的完整代码:

示例组件代码。

public class TestComponent extends AbstractKernelBasedComponent {

  private static final Logger log = Logger.getLogger(TestComponent.class.getName());

  @Inject
  private TestModule testModule;

  @Override
  public synchronized void everyMinute() {
    super.everyMinute();
    testModule.everyMinute();
  }

  @Override
  public String getComponentVersion() {
  String version = this.getClass().getPackage().getImplementationVersion();
    return version == null ? "0.0.0" : version;
  }

  @Override
  public String getDiscoDescription() {
    return "Spam filtering";
  }

  @Override
  public String getDiscoCategoryType() {
      return "spam";
  }

  @Override
  public int hashCodeForPacket(Packet packet) {
    if (packet.getElemTo() != null) {
      return packet.getElemTo().hashCode();
    }
    // This should not happen, every packet must have a destination
    // address, but maybe our SPAM checker is used for checking
    // strange kind of packets too....
    if (packet.getStanzaFrom() != null) {
      return packet.getStanzaFrom().hashCode();
    }
    // If this really happens on your system you should look carefully
    // at packets arriving to your component and decide a better way
    // to calculate hashCode
    return 1;
  }

  @Override
  public boolean isDiscoNonAdmin() {
    return false;
  }

  @Override
  public int processingInThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int processingOutThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  protected void registerModules(Kernel kernel) {
    // here we need to register modules responsible for processing packets
    kernel.registerBean("disco").asClass(DiscoveryModule.class).exec();
  }

}

示例模块代码。

@Bean(name = "test-module", parent = TestComponent.class, active = true)
public static class TestModule extends AbstractModule {

  private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName());

  private Criteria CRITERIA = ElementCriteria.name("message");
  private String[] FEATURES = { "tigase:x:spam-filter", "tigase:x:spam-reporting" };

  @ConfigField(desc = "Bad words", alias = "bad-words")
  private String[] badWords = {"word1", "word2", "word3"};
  @ConfigField(desc = "White listed addresses", alias = "white-list")
  private String[] whiteList = {"admin@localhost"};
  @ConfigField(desc = "Logged packet types", alias = "packet-types")
  private String[] packetTypes = {"message", "presence", "iq"};
  @ConfigField(desc = "Prefix", alias = "log-prepend")
  private String prependText = "Spam detected: ";
  @ConfigField(desc = "Secure logging", alias = "secure-logging")
  private boolean secureLogging = false;
  @ConfigField(desc = "Abuse notification address", alias = "abuse-address")
  private JID abuseAddress = JID.jidInstanceNS("abuse@locahost");
  @ConfigField(desc = "Frequency of notification", alias = "notification-frequency")
  private int notificationFrequency = 10;
  private int delayCounter = 0;
  private long spamCounter = 0;
  private long totalSpamCounter = 0;
  private long messagesCounter = 0;


  @Inject
  private TestComponent component;

  public void everyMinute() {
    if ((++delayCounter) >= notificationFrequency) {
      write(Message.getMessage(abuseAddress, component.getComponentId(), StanzaType.chat,
                               "Detected spam messages: " + spamCounter, "Spam counter", null,
                               component.newPacketId("spam-")));
      delayCounter = 0;
      spamCounter = 0;
    }
  }

  @Override
  public String[] getFeatures() {
    return FEATURES;
  }

  @Override
  public Criteria getModuleCriteria() {
    return CRITERIA;
  }

  public void setPacketTypes(String[] packetTypes) {
    this.packetTypes = packetTypes;
    Criteria crit = new Or();
    for (String packetType : packetTypes) {
      crit.add(ElementCriteria.name(packetType));
    }
    CRITERIA = crit;
  }

  @Override
  public void process(Packet packet) throws ComponentException, TigaseStringprepException {
    // Is this packet a message?
    if ("message" == packet.getElemName()) {
      component.updateServiceDiscoveryItem(component.getName(), "messages",
                                           "Messages processed: [" + (++messagesCounter) + "]", true);
      String from = packet.getStanzaFrom().toString();
      // Is sender on the whitelist?
      if (Arrays.binarySearch(whiteList, from) < 0) {
        // The sender is not on whitelist so let's check the content
        String body = packet.getElemCDataStaticStr(Message.MESSAGE_BODY_PATH);
        if (body != null && !body.isEmpty()) {
          body = body.toLowerCase();
          for (String word : badWords) {
            if (body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              component.updateServiceDiscoveryItem(component.getName(), "spam", "Spam caught: [" +
                                                   (++totalSpamCounter) + "]", true);
              return;
            }
          }
        }
      }
    }
    // Not a SPAM, return it for further processing
    Packet result = packet.swapFromTo();
    write(result);
  }
}

7.5. 组件实现 - 第 5 课 - 统计

在大多数情况下,您会希望从组件中收集一些运行时统计信息,以了解它是如何工作的,检测可能的性能问题或拥塞问题。所有服务器统计信息都公开并可通过XMPP 使用ad-hoc命令访问,HTTP、JMX和一些选定的统计信息也可通过SNMP获得。作为组件开发人员,您不必做任何事情来通过任何这些协议公开您的统计信息,您只需要提供您的统计信息,管理员就可以按照他想要的任何方式访问它们。

本课将教您如何添加自己的统计信息以及如何确保生成的统计信息不会影响应用程序性能。

spam statitics small

您的组件从一开始就通过它继承的类生成一些统计信息。让我们向垃圾邮件过滤组件添加一些统计信息:

@Override
public void getStatistics(StatisticsList list) {
  super.getStatistics(list);
  list.add(getName(), "Spam messages found", totalSpamCounter, Level.INFO);
  list.add(getName(), "All messages processed", messagesCounter, Level.FINER);
  if (list.checkLevel(Level.FINEST)) {
    // Some very expensive statistics generation code...
  }
}

代码应该是不言自明的。

你必须调用 super.getStatistics(...​) 来更新父类的统计信息。 StatisticsList 是一个集合,它以一种易于更新、搜索和检索它们的方式保存所有统计信息。您实际上不需要了解所有实现细节,但如果您有兴趣,请参阅源代码和JavaDoc文档。

add(...​) 方法的第一个参数是组件名称。所有统计信息都按组件名称分组,以便更轻松地查看特定组件数据。接下来是元素的描述。第三个参数是元素值,可以是任意数字或字符串。

最后一个参数可能是最有趣的。这个想法是从日志框架中借来的。每个统计项目都有重要性级别。级别与 SEVERE 最关键和 FINEST 最不重要的日志记录方法完全相同。添加了此参数以提高性能和统计信息检索。创建 StatisticsList 对象时,它会被分配用户请求的级别。如果 add(...​) 方法以较低的优先级调用,则该元素甚至不会添加到列表中。这节省了网络带宽,提高了统计检索速度,也更清晰地呈现给最终用户。

一开始可能有点令人困惑的是,如果有一个数字元素添加到具有 0 值的统计数据中,则级别总是被强制为 FINEST。假设管理员通常对 零值 统计信息不感兴趣,因此除非他有意请求最低级别的统计信息,否则他不会看到带有 零值 的元素。

if 语句也需要一些解释。通常添加一个新的统计元素并不是一个非常昂贵的操作,因此在适当的级别使用 add(...​) 方法传递它就足够了。但是,有时准备统计数据可能会非常昂贵,例如从数据库中读取/计算一些记录。统计数据可以非常频繁地收集,因此如果不使用,那么收集统计数据根本没有意义,因为当前级别高于​​我们通过的项目。在这种情况下,建议测试元素级别是否会被集合接受,如果不完全跳过整个处理。

如您所见,用于生成和呈现组件统计信息的API非常简单明了。只有一种覆盖方法和一种简单的方法来传递您自己的计数器。下面是示例组件的完整代码:

示例组件代码。

public class TestComponent extends AbstractKernelBasedComponent {

  private static final Logger log = Logger.getLogger(TestComponent.class.getName());

  @Inject
  private TestModule testModule;

  @Override
  public synchronized void everyMinute() {
    super.everyMinute();
    testModule.everyMinute();
  }

  @Override
  public String getComponentVersion() {
  String version = this.getClass().getPackage().getImplementationVersion();
    return version == null ? "0.0.0" : version;
  }

  @Override
  public String getDiscoDescription() {
    return "Spam filtering";
  }

  @Override
  public String getDiscoCategoryType() {
      return "spam";
  }

  @Override
  public int hashCodeForPacket(Packet packet) {
    if (packet.getElemTo() != null) {
      return packet.getElemTo().hashCode();
    }
    // This should not happen, every packet must have a destination
    // address, but maybe our SPAM checker is used for checking
    // strange kind of packets too....
    if (packet.getStanzaFrom() != null) {
      return packet.getStanzaFrom().hashCode();
    }
    // If this really happens on your system you should look carefully
    // at packets arriving to your component and decide a better way
    // to calculate hashCode
    return 1;
  }

  @Override
  public boolean isDiscoNonAdmin() {
    return false;
  }

  @Override
  public int processingInThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int processingOutThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  protected void registerModules(Kernel kernel) {
    // here we need to register modules responsible for processing packets
  }

  @Override
  public void getStatistics(StatisticsList list) {
    super.getStatistics(list);
    list.add(getName(), "Spam messages found", testModule.getTotalSpamCounter(), Level.INFO);
    list.add(getName(), "All messages processed", testModule.getMessagesCounter(), Level.FINE);
    if (list.checkLevel(Level.FINEST)) {
      // Some very expensive statistics generation code...
    }
  }

}

示例模块代码。

@Bean(name = "test-module", parent = TestComponent.class, active = true)
public static class TestModule extends AbstractModule {

  private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName());

  private Criteria CRITERIA = ElementCriteria.name("message");
  private String[] FEATURES = { "tigase:x:spam-filter", "tigase:x:spam-reporting" };

  @ConfigField(desc = "Bad words", alias = "bad-words")
  private String[] badWords = {"word1", "word2", "word3"};
  @ConfigField(desc = "White listed addresses", alias = "white-list")
  private String[] whiteList = {"admin@localhost"};
  @ConfigField(desc = "Logged packet types", alias = "packet-types")
  private String[] packetTypes = {"message", "presence", "iq"};
  @ConfigField(desc = "Prefix", alias = "log-prepend")
  private String prependText = "Spam detected: ";
  @ConfigField(desc = "Secure logging", alias = "secure-logging")
  private boolean secureLogging = false;
  @ConfigField(desc = "Abuse notification address", alias = "abuse-address")
  private JID abuseAddress = JID.jidInstanceNS("abuse@locahost");
  @ConfigField(desc = "Frequency of notification", alias = "notification-frequency")
  private int notificationFrequency = 10;
  private int delayCounter = 0;
  private long spamCounter = 0;
  private long totalSpamCounter = 0;
  private long messagesCounter = 0;


  @Inject
  private TestComponent component;

  public void everyMinute() {
    if ((++delayCounter) >= notificationFrequency) {
      write(Message.getMessage(abuseAddress, component.getComponentId(), StanzaType.chat,
                               "Detected spam messages: " + spamCounter, "Spam counter", null,
                               component.newPacketId("spam-")));
      delayCounter = 0;
      spamCounter = 0;
    }
  }

  @Override
  public String[] getFeatures() {
    return FEATURES;
  }

  @Override
  public Criteria getModuleCriteria() {
    return CRITERIA;
  }

  public long getMessagesCounter() {
    return messagesCounter;
  }

  public long getTotalSpamCounter() {
    return totalSpamCounter;
  }

  public void setPacketTypes(String[] packetTypes) {
    this.packetTypes = packetTypes;
    Criteria crit = new Or();
    for (String packetType : packetTypes) {
      crit.add(ElementCriteria.name(packetType));
    }
    CRITERIA = crit;
  }

  @Override
  public void process(Packet packet) throws ComponentException, TigaseStringprepException {
    // Is this packet a message?
    if ("message" == packet.getElemName()) {
      component.updateServiceDiscoveryItem(component.getName(), "messages",
                                           "Messages processed: [" + (++messagesCounter) + "]", true);
      String from = packet.getStanzaFrom().toString();
      // Is sender on the whitelist?
      if (Arrays.binarySearch(whiteList, from) < 0) {
        // The sender is not on whitelist so let's check the content
        String body = packet.getElemCDataStaticStr(Message.MESSAGE_BODY_PATH);
        if (body != null && !body.isEmpty()) {
          body = body.toLowerCase();
          for (String word : badWords) {
            if (body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              component.updateServiceDiscoveryItem(component.getName(), "spam", "Spam caught: [" +
                                                   (++totalSpamCounter) + "]", true);
              return;
            }
          }
        }
      }
    }
    // Not a SPAM, return it for further processing
    Packet result = packet.swapFromTo();
    write(result);
  }
}

7.6. 组件实现 - 第 6 课 - 脚本支持

脚本支持是Tigase服务器内置的基本API,可自动用于任何组件,无需额外资源成本。但是,此框架只能访问由您的代码从父类继承的现有组件变量。它无法访问您在组件中添加的任何数据或任何结构。将您的一些数据暴露给脚本API需要一点努力。

本指南展示了如何使用组件特定的数据结构扩展现有的脚本API。

将您的组件实现与脚本API集成就像下面的代码一样简单:

private static final String BAD_WORDS_VAR = "badWords";
private static final String WHITE_LIST_VAR = "whiteList";

@Override
public void initBindings(Bindings binds) {
  super.initBindings(binds);
  binds.put(BAD_WORDS_VAR, testModule.badWords);
  binds.put(WHITE_LIST_VAR, testModule.whiteList);
}

这样,您可以将两个组件变量:badWordswhiteList 以相同的名称暴露给脚本 - 两个已定义的常量。您当然可以使用不同的名称,但保持简单明了总是一个好主意,因此我们在组件和脚本中使用相同的变量名称。

几乎完成了,几乎……​在我们的旧实现中,这两个变量是 String 的Java数组。因此,我们只能更改它们的元素,但不能在脚本内的这些结构中添加或删除元素。这不是很实用,它对脚本的代码施加了一些严重的限制。为了克服这个问题,我更改了测试组件代码以在 java.util.Set 集合中保留坏词和白名单。这给了我们足够的灵活性来操作数据。

由于我们的组件现在已准备好与脚本API合作,我现在将演示如何使用脚本和临时命令添加删除或更改这些集合的元素。

test comp newscript

首先,浏览服务器服务发现并双击测试组件。如果您使用 Psi 客户端,这应该会为您带来一个带有临时命令列表的新窗口。其他客户端可能会以不同的方式呈现可用的临时命令。

下面的屏幕截图显示了它的外观。您必须为脚本和ID字符串提供一些描述。我们在本指南中使用Groovy,但您也可以使用任何不同的脚本语言。

badwords list script

有关如何添加对更多语言的支持的所有详细信息,请参阅Tigase脚本文档。从Tigase API的角度来看,一切看起来都一样。您必须从右侧显示的窗口的下拉列表中选择适当的语言。如果您的首选语言不在列表中,则表示它没有正确安装并且Tigase无法检测到它。

提取当前坏词列表的脚本可以像以下Groovy代码一样简单:

def badw = (java.util.Set)badWords
def result = ""
for (s in badw) { result += s + "\n" }
return result

正如您从代码中看到的,您必须将组件变量引用到脚本中的变量,以确保使用正确的类型。剩下的就很简单了,是纯脚本语言的东西。

将脚本加载到服务器并执行它。您应该会收到一个新窗口,其中列出了垃圾邮件过滤器当前使用的所有坏词。

下面是另一个简单的脚本,它允许从列表中更新(添加/删除)坏词。

import tigase.server.Command
import tigase.server.Packet

def WORDS_LIST_KEY = "words-list"
def OPERATION_KEY = "operation"
def REMOVE = "Remove"
def ADD = "Add"
def OPERATIONS = [ADD, REMOVE]

def badw = (java.util.Set)badWords
def Packet p = (Packet)packet
def words = Command.getFieldValue(p, WORDS_LIST_KEY)
def operation = Command.getFieldValue(p, OPERATION_KEY)

if (words == null) {
  // No data to process, let's ask user to provide
  // a list of words
  def res = (Packet)p.commandResult(Command.DataType.form)
  Command.addFieldValue(res, WORDS_LIST_KEY, "", "Bad words list")
  Command.addFieldValue(res, OPERATION_KEY, ADD, "Operation",
    (String[])OPERATIONS, (String[])OPERATIONS)
  return res
}

def words_list = words.tokenize(",")

if (operation == ADD) {
  words_list.each { badw.add(it.trim()) }
  return "Words have been added."
}

if (operation == REMOVE) {
  words_list.each { badw.remove(it.trim()) }
  return "Words have been removed."
}

return "Unknown operation: " + operation

这两个脚本只是开始。可能性是无穷无尽的,只需在测试组件中添加几行简单的代码,您就可以在运行时通过执行各种操作的脚本来扩展您的应用程序;您可以重新加载脚本,添加和删除它们,根据需要扩展和修改功能。无需重新启动服务器,无需重新编译代码,您可以使用任何您喜欢的脚本语言。

当然,白名单修改的脚本看起来完全一样,在这里附加它们是没有意义的。

这是测试组件的完整代码,其中包含开头描述的新方法,数据结构从 String*s 数组更改为 Java *Set

示例组件代码。

public class TestComponent extends AbstractKernelBasedComponent {

  private static final Logger log = Logger.getLogger(TestComponent.class.getName());

  private static final String BAD_WORDS_KEY = "bad-words";
  private static final String WHITELIST_KEY = "white-list";

  @Inject
  private TestModule testModule;

  @Override
  public synchronized void everyMinute() {
    super.everyMinute();
    testModule.everyMinute();
  }

  @Override
  public String getComponentVersion() {
  String version = this.getClass().getPackage().getImplementationVersion();
    return version == null ? "0.0.0" : version;
  }

  @Override
  public String getDiscoDescription() {
    return "Spam filtering";
  }

  @Override
  public String getDiscoCategoryType() {
      return "spam";
  }

  @Override
  public int hashCodeForPacket(Packet packet) {
    if (packet.getElemTo() != null) {
      return packet.getElemTo().hashCode();
    }
    // This should not happen, every packet must have a destination
    // address, but maybe our SPAM checker is used for checking
    // strange kind of packets too....
    if (packet.getStanzaFrom() != null) {
      return packet.getStanzaFrom().hashCode();
    }
    // If this really happens on your system you should look carefully
    // at packets arriving to your component and decide a better way
    // to calculate hashCode
    return 1;
  }

  @Override
  public boolean isDiscoNonAdmin() {
    return false;
  }

  @Override
  public int processingInThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  public int processingOutThreads() {
    return Runtime.getRuntime().availableProcessors();
  }

  @Override
  protected void registerModules(Kernel kernel) {
    // here we need to register modules responsible for processing packets
  }

  @Override
  public void getStatistics(StatisticsList list) {
    super.getStatistics(list);
    list.add(getName(), "Spam messages found", testModule.getTotalSpamCounter(), Level.INFO);
    list.add(getName(), "All messages processed", testModule.getMessagesCounter(), Level.FINE);
    if (list.checkLevel(Level.FINEST)) {
      // Some very expensive statistics generation code...
    }
  }

    @Override
    public void initBindings(Bindings binds) {
        super.initBindings(binds);
        binds.put(BAD_WORDS_KEY, testModule.badWords);
        binds.put(WHITELIST_KEY, testModule.whiteList);
    }

}

示例模块代码。

@Bean(name = "test-module", parent = TestComponent.class, active = true)
public static class TestModule extends AbstractModule {

  private static final Logger log = Logger.getLogger(TestModule.class.getCanonicalName());

  private Criteria CRITERIA = ElementCriteria.name("message");
  private String[] FEATURES = { "tigase:x:spam-filter", "tigase:x:spam-reporting" };

  @ConfigField(desc = "Bad words", alias = "bad-words")
  protected CopyOnWriteArraySet<String> badWords = new CopyOnWriteArraySet<>(Arrays.asList(new String[] {"word1", "word2", "word3"}));
  @ConfigField(desc = "White listed addresses", alias = "white-list")
  protected CopyOnWriteArraySet<String> whiteList = new CopyOnWriteArraySet<>(Arrays.asList(new String[] {"admin@localhost"}));
  @ConfigField(desc = "Logged packet types", alias = "packet-types")
  private String[] packetTypes = {"message", "presence", "iq"};
  @ConfigField(desc = "Prefix", alias = "log-prepend")
  private String prependText = "Spam detected: ";
  @ConfigField(desc = "Secure logging", alias = "secure-logging")
  private boolean secureLogging = false;
  @ConfigField(desc = "Abuse notification address", alias = "abuse-address")
  private JID abuseAddress = JID.jidInstanceNS("abuse@locahost");
  @ConfigField(desc = "Frequency of notification", alias = "notification-frequency")
  private int notificationFrequency = 10;
  private int delayCounter = 0;
  private long spamCounter = 0;
  private long totalSpamCounter = 0;
  private long messagesCounter = 0;


  @Inject
  private TestComponent component;

  public void everyMinute() {
    if ((++delayCounter) >= notificationFrequency) {
      write(Message.getMessage(abuseAddress, component.getComponentId(), StanzaType.chat,
                               "Detected spam messages: " + spamCounter, "Spam counter", null,
                               component.newPacketId("spam-")));
      delayCounter = 0;
      spamCounter = 0;
    }
  }

  @Override
  public String[] getFeatures() {
    return FEATURES;
  }

  @Override
  public Criteria getModuleCriteria() {
    return CRITERIA;
  }

  public int getMessagesCounter() {
    return messagesCounter;
  }

  public int getTotalSpamCounter() {
    return totalSpamCounter;
  }

  public void setPacketTypes(String[] packetTypes) {
    this.packetTypes = packetTypes;
    Criteria crit = new Or();
    for (String packetType : packetTypes) {
      crit.add(ElementCriteria.name(packetType));
    }
    CRITERIA = crit;
  }

  @Override
  public void process(Packet packet) throws ComponentException, TigaseStringprepException {
    // Is this packet a message?
    if ("message" == packet.getElemName()) {
      component.updateServiceDiscoveryItem(component.getName(), "messages",
                                           "Messages processed: [" + (++messagesCounter) + "]", true);
      String from = packet.getStanzaFrom().toString();
      // Is sender on the whitelist?
      if (!whiteList.contains(from)) {
        // The sender is not on whitelist so let's check the content
        String body = packet.getElemCDataStaticStr(Message.MESSAGE_BODY_PATH);
        if (body != null && !body.isEmpty()) {
          body = body.toLowerCase();
          for (String word : badWords) {
            if (body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              component.updateServiceDiscoveryItem(component.getName(), "spam", "Spam caught: [" +
                                                   (++totalSpamCounter) + "]", true);
              return;
            }
          }
        }
      }
    }
    // Not a SPAM, return it for further processing
    Packet result = packet.swapFromTo();
    write(result);
  }
}

7.7. 组件实现 - 第7课 - 数据存储库

7.7.1. ConfigRepository

在某些情况下,您希望组件永久存储一些数据。您当然可以使用组件配置来提供一些数据库连接设置,实现自己的数据库连接器并存储您需要的记录。但是,有一个非常简单且有用的框架,它允许您在数据库或磁盘文件中透明地读取和存储一些数据。该框架还直接支持ad-hoc命令接口,因此您可以使用XMPP客户端操作组件数据。

为了使用它,需要扩展 tigase.db.comp.ConfigRepository 抽象类。

7.7.2. 访问UserRepository或AuthRepository

要使用 AuthRepositoryUserRepository,您只需正确声明字段并使用 @Inject 注释它们。该字段必须是由Tigase Kernel管理的类的一部分 - 组件的类或使用 @Bean 注释的任何类。对于该类,将通过依赖注入来注入适当的存储库实例。

AuthRepository和UserRepository 的示例用法。

@Inject
private AuthRepository authRepository;
@Inject
private UserRepository userRepository;

7.7.3. 访问其他存储库

为了在访问存储库时拥有更多自由,可以创建和使用实现 DataSourceAware 接口的自定义存储库实现。

对于我们的示例,假设它将是实现 TestRepositoryIfc 的类,并且我们的实现将使用JDBC。为了使其工作,我们需要将 TestRepositoryIfc 定义为扩展 DataSourceAware 接口的通用接口。 DataSourceAware 接口将为Tigase XMPP服务器内部所需的方法提供定义,以基于 TestRepositoryIfc 初始化自定义存储库类。

TestRepositoryIfc.

public interface TestRepositoryIfc<DS extends DataSource> extends DataSourceAware<DS> {
  // Example method
  void addItem(BareJID userJid, String item) throws RepositoryException;
}

接下来我们需要准备存储库的实际实现 - 负责执行SQL语句的类。在这个类中,我们需要实现我们的接口和方法 void setDataSource(DataSource dataSource) 中的所有方法,该方法来自 DataSourceAware 接口。在这个方法中,我们需要初始化数据源,即创建准备好的语句。我们应该使用 @Repository.Meta 注释来注释我们的新类,这将允许Tigase XMPP服务器在类实现 TestRepositoryIfc 并支持带有jdbc URI的数据源时找到这个类。

@Repository.Meta(supportedUris = "jdbc:.*")
public static class JDBCTestRepository implements TestRepositoryIfc<DataRepository> {

  private static final String SOME_STATEMENT = "select * from tig_users";

  private DataRepository repository;

  @Override
  public void setDataSource(DataRepository repository) {
    // here we need to initialize required prepared statements
    try {
      repository.initPreparedStatement(SOME_STATEMENT, SOME_STATEMENT);
    } catch (SQLException ex) {
      throw new RuntimeException("Could not initialize repository", ex);
    }
    this.repository = repository;
  }

  @Override
  public void addItem(BareJID userJid, String item) throws RepositoryException {
    try {
      PreparedStatement stmt = repository.getPreparedStatement(userJid, SOME_STATEMENT);
      synchronized (stmt) {
        // do what needs to be done
      }
    } catch (SQLException ex) {
      throw new RepositoryException(ex);
    }
  }
}

如您所见,我们为接口 TestRepositoryIfc 定义了数据源通用参数的类型。有了这个,我们确保只提供实现 DataRepository 接口的实例,并且幸亏如此我们不需要在访问数据源之前将提供的 DataSource 实例转换为这个接口。

有了这些,我们需要创建一个类来增加对多数据库设置的支持。在我们的例子中,它将是 TestRepositoryMDBean,它将负责发现存储库类、初始化和重新注入数据源。必须这样做,因为刚才提到我们的 TestRepositoryMDBean 将负责 JDBCTestRepository 的初始化(实际上这将由 TestRepositoryMDBean 扩展的 MDRepositoryBean 完成)。

@Bean(name = "repository", parent = TestComponent.class, active = true)
public static class TestRepositoryMDBean extends MDRepositoryBeanWithStatistics<TestRepositoryIfc>
    implements TestRepositoryIfc {

  public TestRepositoryMDBean() {
    super(TestRepositoryIfc.class);
  }

  @Override
  public Class<?> getDefaultBeanClass() {
    return TestRepositoryConfigBean.class;
  }

  @Override
  public void setDataSource(DataSource dataSource) {
    // nothing to do here
  }

  @Override
  public void addItem(BareJID userJid, String item) throws RepositoryException {
    getRepository(userJid.getDomain()).addItem(userJid, item);
  }

  @Override
  protected Class<? extends TestRepositoryIfc> findClassForDataSource(DataSource dataSource)
                throws DBInitException {
    return DataSourceHelper.getDefaultClass(TestRepositoryIfc.class, dataSource.getResourceUri());
  }

  public static class TestRepositoryConfigBean extends MDRepositoryConfigBean<TestRepositoryIfc> {
  }
}

在所有基于 MDRepositoryBeanWithStatistics 的实现中,大部分代码都是相同的。在我们的例子中,唯一的自定义方法是 void addItem(…​) 它使用 getRepository(String domain) 方法来检索域的正确存储库。需要为 TestRepositoryIfc 的每个自定义方法完成对域的实际存储库实例的检索。

小技巧

也可以扩展 MDRepositoryBeanSDRepositoryBean 而不是 MDRepositoryBeanWithStatistics。但是,如果您决定扩展没有 withStatistics 后缀的抽象存储库bean类,则不会收集与该存储库的使用相关的统计数据。唯一的变化是您不需要将接口类传递给超类的构造函数,因为它不需要。

备注

如上所述,也可以扩展 SDRepostioryBeanSDRepositoryBeanWithStatistics。在扩展 MDRepositoryBeanWithStatistics 的情况下,您需要实现的方法是相同的,但是在内部,SDRepositoryBean 将不支持为不同的域使用不同的存储库。事实上,SDRepositoryBeanWithStatistics 只有一个存储库实例,所有域只使用一个数据源。如果只配置了一个 default 存储库实例,则 MDRepositoryBeanWithStatistics 会呈现相同的行为。然而,MDRepositoryBeanWithStatistics 提供了更好的灵活性,因此不鼓励使用 SDRepositoryBeanSDRepositoryBeanWithStatistics

虽然这比以前的版本更难实现,但它为您提供了对多数据库设置的支持,并为您提供了可用于诊断的数据库查询时间的统计信息。

正如您还可以看到的,我们使用 @Bean 注释对 TestRepositoryMDBean 进行了注释,这将强制Tigase内核在每次加载 TestComponent 时加载它。这样,只需创建一个字段并对其进行注释,就可以将此类的实例作为对该组件(即组件、模块等)使用的任何bean的依赖项注入:

@Inject
private TestRepositoryIfc testRepository;

小技巧

TestRepositoryMDBeantestRepository 字段实例将被注入。

备注

如果我们打算使用我们的存储库的类深深嵌套在内核依赖项中,并且我们想要利用自动模式版本控制,我们必须在我们的类中实现 tigase.kernel.beans.RegistrarBean

7.7.4. 配置

我们的类 TestRepositoryMDBean@Bean 注释,将其名称设置为 repository 并将parent 设置为 TestComponent。该组件的实例是通过在Tigase XMPP服务器配置文件中的 test 名称下使用来配置的。因此,与我们的存储库相关的所有配置都应该放在 test 部分的 repository 部分中。

示例。

test(class: TestComponent) {
    repository () {
        // repository related configuration
    }
}

默认值

如上所述,如果我们使用 MDRepositoryBeanWithStatistics 作为 TestRepositoryMDBean 的基类,那么我们可能有不同的数据源用于不同的域。默认情况下,如果我们不配置它,MDRepositoryBeanWithStatistics 将只创建一个名为``default`` 的存储库实例。它将用于所有域,默认情况下,它将使用与存储库实例命名相同的数据源 - 它将使用名为 default 的数据源。此默认值等于在配置文件中输入的以下配置:

test(class: TestComponent) {
    repository () {
        default () {
            dataSourceName = 'default'
        }
    }
}

更改存储库使用的数据源

可以使任何存储库使用与以与存储库实例相同的名称配置的数据源不同的数据源。为此,您需要将存储库实例的 dataSourceName 属性设置为它应该使用的数据源的名称。

示例设置存储库 default 以使用名为 test 的数据源。

test(class: TestComponent) {
    repository () {
        default () {
            dataSourceName = 'test'
        }
    }
}

为域配置单独的存储库

要配置要用于特定域的存储库实例,您需要定义与应该使用它的域同名的存储库。默认情况下,它将使用名称与域名相同的数据源。

使用名为 example.com 的数据源为 example.com 单独存储库。

dataSource () {
    // configuration of data sources here is not complete
    default () {
        uri = "jdbc:derby:/database"
    }
    'example.com' () {
        uri = "jdbc:derby/example"
    }
}

test(class: TestComponent) {
    repository () {
        default () {
        }
        'example.com' () {
        }
    }
}

使用名为 test 的数据源为 example.com 单独存储库。

dataSource () {
    // configuration of data sources here is not complete
    default () {
        uri = "jdbc:derby:/database"
    }
    'test' () {
        uri = "jdbc:derby/example"
    }
}

test(class: TestComponent) {
    repository () {
        default () {
        }
        'example.com' () {
            dataSourceName = 'test'
        }
    }
}

备注

在上面介绍的两个示例中,对于 example.com 以外的域,将使用名为 default 的存储库实例,并将使用名为 default 的数据源。

7.7.5. 存储库版本控制

在创建自定义实现时,还可以启用存储库版本控制功能。有几个部分/步骤可以充分利用这种机制。

每个 DataSource 都有一个 tig_schema_versions 表,其中包含有关安装在与特定DataSource关联的数据库中的组件模式版本的信息。

在实现中启用版本检查

首先,存储库实现应该实现 tigase.db.util.RepositoryVersionAware 接口(它的所有方法都是默认定义的)并用 tigase.db.Repository.SchemaId 注释它。例如 .Repository 用 SchemaId 注释并实现 RepositoryVersionAware

@Repository.SchemaId(id = "test-component", name = "Test Component")
public static class TestRepositoryMDBean extends MDRepositoryBeanWithStatistics<TestRepositoryIfc>
    implements TestRepositoryIfc {
}

仅此操作将导致在Tigase XMPP服务器启动和存储库初始化期间执行检查表、索引、存储过程和其他元素是否存在于所需版本的配置数据源中。默认情况下,所需版本与实现版本匹配(通过调用 java.lang.Package.getImplementationVersion() 获得),但是可以手动指定所需版本,或者:

  • 通过利用 tigase.db.util.RepositoryVersionAware.SchemaVersion 注释:

@Repository.SchemaId(id = "test_component", name = "Test Component")
@RepositoryVersionAware.SchemaVersion(version = "0.0.1")
public static class TestRepositoryMDBean extends MDRepositoryBeanWithStatistics<TestRepositoryIfc>
    implements TestRepositoryIfc {
…
}
  • 或者通过覆盖 tigase.db.util.RepositoryVersionAware.getVersion 方法:

@Override
public Version getVersion() {
    return "0.0.1";
}

处理错误版本和升级

要检测数据库中的版本信息不足,将发生以下逻辑:

  • 如果数据库中没有版本信息,则服务将完全停止,提示安装模式(根据用户偏好通过 update-schemainstall-schema);

  • 如果存储库中有关于加载的组件模式版本的信息,并且所需模式版本的基本部分(即仅考虑 major.minor.bugfix 部分)与存储库中存在的不同,则:

    • 如果所需的组件模式版本是 final (即非 SNAPSHOT),服务器将关闭并在日志文件(即 logs/tigase-console.log)中打印强制用户的终端错误升级架构;

    • 如果所需的组件模式版本是 非最终版本 (即具有 SNAPSHOT 部分),那么将在日志文件(即 logs/tigase-console.log)中打印警告提示由于架构可能发生变化,用户运行升级过程,但 server不会stop;

将通过执行以下命令来升级数据库中加载的模式:

./scripts/tigase.sh upgrade-schema etc/tigase.conf

上述命令将加载当前配置、所有已配置数据源和已启用组件的信息,然后在相应数据源中对每个已配置组件的架构进行升级。

根据数据库的类型(或指定的注释),内部处理升级过程的方式略有不同。

关系数据库(外部处理)

对于所有关系数据库(MySQL、PostgreSQL、MS SQL Server 等),我们强烈建议使用以下命名约定将完整的数据库模式存储在外部文件中:<database_type>-<component_name>-<version>.sql,对于用于MySQL的测试组件版本0.0.5的示例完整模式将存储在名为 mysql-test-0.0.5.sql 的文件中。更重要的是 - 架构文件必须存储在Tigase XMPP Server安装目录的 database/ 子目录下。

备注

这可以通过 Repository.SchemaId 注释的 external 属性来控制,默认为”true”,如果设置为 false,那么处理将按照 关系数据库(外部处理) 中的描述进行

例如:

  • database/mysql-test-0.0.1.sql

  • database/mysql-test-0.0.2.sql

  • database/mysql-test-0.0.3.sql

  • database/mysql-test-0.0.4.sql

  • database/mysql-test-0.0.5.sql

在升级过程中,所有必需的模式文件都将按版本升序加载。版本范围将取决于条件并遵循简单的规则:

  • 范围的开始将从数据库中当前加载的版本的下一个版本开始(例如,如果加载到数据库的当前版本是0.0.3并且我们正在部署组件版本0.0.5那么SchemaLoader将尝试从文件加载模式:database/mysql-test-0.0.4.sqldatabase/mysql-test-0.0.5.sql

  • 如果我们尝试部署组件的 SNAPSTHOT 版本,则与该版本匹配的模式文件将始终包含在要加载的文件列表中(例如,如果我们尝试部署组件版本为0.0.0的夜间构建。 0.0.5-SNAPSHOT 并且当前在数据库中加载的模式版本是 0.0.5 那么SchemaLoader将在要加载的文件列表中包含数据库 database/mysql-test-0.0.5.sql

也可以跳过上面的过滤逻辑并通过将 --forceReloadAllSchemaFiles=true 参数附加到 upgrade-schema/install-schema``来强制从``database/ 目录加载特定组件/数据库的所有架构文件。

非关系数据库(内部处理)

如果需要在内部处理数据库模式(例如对于像NoSQL数据库这样的情况,或者只是有这样的偏好),那么可以通过将 Repository.SchemaId 注释的 external 属性设置为 false

@Repository.SchemaId(id = "test_component", name = "Test Component", external = false)

在这种情况下,应该实现来自 tigase.db.util.RepositoryVersionAware 接口的 updateSchema 方法来处理架构的安装/更新。它需要两个参数:

  • Optional<Version> oldVersion - 指示加载到数据库的架构的当前版本(如果存在)

  • Version newVersion - 表明所需的版本(组件的版本或存储库的特定版本)

在数据库中设置所需的存储库版本

每个版本化的架构文件应在末尾包含负责以存储过程调用的形式设置加载架构的适当版本的代码,其中组件的名称和版本作为参数:

  • Postgresql

-- QUERY START:
select TigSetComponentVersion('test_component', '0.0.5');
-- QUERY END:
  • MsSQL服务器

-- QUERY START:
exec TigSetComponentVersion 'test_component', '0.0.5';
-- QUERY END:
GO
  • MySQL

-- QUERY START:
call TigSetComponentVersion('test_component', '0.0.5');
-- QUERY END:
  • Derby

-- QUERY START:
call TigSetComponentVersion('test_component', '0.0.5');
-- QUERY END:

在内部处理架构的情况下,成功加载后(即执行已实现的 tigase.db.util.RepositoryVersionAware.updateSchema 方法返回 tigase.db.util.SchemaLoader.Result.ok )数据库中的版本将设置为组件的当前版本。

这允许(在外部处理模式的情况下)通过直接将 .sql 文件导入数据库来手动加载它。

7.8. 组件实现 - 第8课 - 组件的生命周期

7.8.1. 组件的初始化

Tigase中的启动挂钩与关闭挂钩不同。

这是因为您无法真正确定启动时间的确切时间。是在应用程序启动时,是在加载配置时,还是在初始化所有对象时。对于每个组件,这甚至可能有所不同。因此,实际上Tigase中并没有与关闭挂钩相同意义的启动挂钩。

有几个方法在组件启动时按以下顺序调用:

  1. 构造函数 - 当然有没有参数的构造函数。但是,它不保证组件的这个实例将被使用。可以创建该对象只是为了获取配置字段的默认值,然后可能会被销毁。

  2. Getters/Setters - 在组件初始化的第二步,内核通过读取和设置带有 @ConfigField() 注释的字段的值来配置组件。如果有一个与注释字段同名的公共getter或setter - 它将被使用。

  3. void beanConfigurationChanged(Collection<String> changedFields) (可选) - 如果组件实现了 ConfigurationChangedAware 接口,将调用此方法来通知组件哪些字段的值已更改。如果组件内部依赖于存储在多个字段中的配置,这很有用,因为它只允许您重新配置组件内部一次。

  4. void register(Kernel kernel) (optional) - 如果组件实现 RegistrarBean 接口,则调用此方法以允许注册组件私有bean。

  5. 依赖注入 - 在此期间,内核将bean注入到带有 @Inject 注释的组件字段中。如果此字段的公共getter或setter存在 - 内核将使用它们。

  6. void initialized() (optional) - 如果组件实现了 Initializable 接口以通知它配置已设置并注入依赖项,则调用它。

  7. void start() - 在此调用组件启动期间,它是内部作业或工作线程或未来活动所需的任何东西。组件的队列和线程此时被初始化。 (此方法返回后组件就绪)

因此,如果您想确保组件已完全加载、初始化并正常运行,那么 start() 钩子是最佳点。

小技巧

组件实例只能启动和停止一次,但是可以在Tigase XMPP服务器正常运行期间创建具有相同名称的相同组件的新实例,即作为服务器重新配置的结果。

7.8.2. 重新配置

在组件实例的生命周期中,可能会重新配置Tigase XMPP服务器。如果此组件的配置更改与其活动无关,则内核将设置带有 @ConfigField() 注释的更改字段的值。在这种情况下,可以使用公共字段设置器。

小技巧

如果组件实现了 ConfigurationChangedAware 接口,则将调用方法 void beanConfigurationChanged(Collection<String> changedFields) 来通知组件哪些字段的值已更改。如果同一组件内部依赖于存储在多个字段中的配置,这很有用,因为它允许您重新配置此内部一次。

7.8.3. 更新注入的依赖项

在组件实例的生命周期中,可能会发生由于重新配置服务器而需要注入其他bean作为组件的依赖项。在这种情况下,Tigase内核会将依赖项注入到带有 @Inject 注释的字段中,该字段的值需要更新。

7.8.4. 停止组件

组件实例可以在Tigase XMPP服务器运行时的任何时候停止,即由于重新配置,或由于服务器正常关闭。

在这两种情况下,将调用组件的以下方法:

  1. void stop() - 第一种方法停止组件内部处理队列。

  2. void beforeUnregister() (可选) - 如果组件实现了@UnregisterAware@ 接口,则调用此方法来通知组件实例它正在被卸载。

  3. void unregister(Kernel kernel) (optional) - 如果组件实现了 RegistrarBean 被调用来为组件提供一种注销bean的方法(如果需要)。