Integration

Mithqtt lets connected devices easily and securely interact with each other, but this is not enough in most cases. Mithqtt exposes a pre-configured interface that can be used to integrated with user's server/cloud side applications, data processing and event handling will then be implemented by user's application.

1. Cluster API

Mithqtt exposes integration interface through Cluster API implementations, and Mithqtt project comes with NATS.io message queue based implementation. But user can create their own Cluster implementation based on message queue or data grid like Kafka, AMQP, Hazelcast etc.

The essential Cluster api is:

package com.github.longkerdandy.mithqtt.api.cluster;

import com.github.longkerdandy.mithqtt.api.message.Message;
import org.apache.commons.configuration.AbstractConfiguration;

/**
 * Cluster
 */
public interface Cluster {

    /**
     * Init the cluster
     *
     * @param config  Configuration
     * @param factory Cluster Listener Factory
     * @throws ClusterException if there is a exception when trying to init cluster.
     */
    void init(AbstractConfiguration config, ClusterListenerFactory factory) throws ClusterException;

    /**
     * Destroy the cluster
     */
    void destroy();

    /**
     * Send message to broker
     *
     * @param brokerId Broker Id
     * @param message  Message
     */
    void sendToBroker(String brokerId, Message message);

    /**
     * Send message to outside
     * Other application can pick up the message from there
     *
     * @param message Message
     */
    void sendToApplication(Message message);
}

2. Integrate via mithqtt-api package

The mithqtt-api comes with a abstract layer to integrate with server side application. It can be used as a sdk for user who want to develop server side integration.

The mithqtt-application-sample is a sample to demonstrate how to integrate with mithqtt-api. The sample simply reads messages from Cluster interface and print them to the logs.

Basically user need to implement two major interfaces:
The ClusterListenerFactory interface, returns the ClusterListener implementation:

package com.github.longkerdandy.mithqtt.application.sample.cluster;

import com.github.longkerdandy.mithqtt.api.cluster.ClusterListener;
import com.github.longkerdandy.mithqtt.api.cluster.ClusterListenerFactory;

/**
 * Sample Application Cluster Listener Factory Implementation
 */
public class ApplicationClusterListenerFactoryImpl implements ClusterListenerFactory {

    @Override
    public ClusterListener newListener() {
        return new ApplicationClusterListenerImpl();
    }
}

The ClusterListener interface, lets user deal with MQTT message events:

package com.github.longkerdandy.mithqtt.application.sample.cluster;

import com.github.longkerdandy.mithqtt.api.cluster.ClusterListener;
import com.github.longkerdandy.mithqtt.api.message.Message;
import com.github.longkerdandy.mithqtt.api.message.MqttPublishPayload;
import com.github.longkerdandy.mithqtt.api.message.MqttSubscribePayloadGranted;
import io.netty.handler.codec.mqtt.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Sample Application Cluster Listener Implementation
 */
public class ApplicationClusterListenerImpl implements ClusterListener {

    private static final Logger logger = LoggerFactory.getLogger(ApplicationClusterListenerImpl.class);

    @Override
    public void onConnect(Message<MqttConnectVariableHeader, MqttConnectPayload> msg) {
        logger.debug("Received CONNECT message {}", msg);
    }

    @Override
    public void onSubscribe(Message<MqttPacketIdVariableHeader, MqttSubscribePayloadGranted> msg) {
        logger.debug("Received SUBSCRIBE message {}", msg);
    }

    @Override
    public void onUnsubscribe(Message<MqttPacketIdVariableHeader, MqttUnsubscribePayload> msg) {
        logger.debug("Received UNSUBSCRIBE message {}", msg);
    }

    @Override
    public void onPublish(Message<MqttPublishVariableHeader, MqttPublishPayload> msg) {
        logger.debug("Received PUBLISH message {}", msg);
    }

    @Override
    public void onDisconnect(Message<Void, Void> msg) {
        logger.debug("Received DISCONNECT message {}", msg);
    }
}

Finally, pass the ClusterListenerFactory as the Cluster implemenation initializes parameter:

Cluster cluster = (Cluster) Class.forName(clusterConfig.getString("cluster.class")).newInstance();
cluster.init(clusterConfig, new ApplicationClusterListenerFactoryImpl());

To build mithqtt-application-sample from source code:

cd mithqtt-application-sample
./gradlew clean build -x test

Running the sample on Linux/Unix/BSD/Mac OS X:

cd mithqtt-application-sample/build/distributions
tar -xvf mithqtt-broker-<VERSION>.tar
cd mithqtt-broker-<VERSION>
./bin/mithqtt-broker