Jump to content
ppulga22

Pub Sub Pattern

Recommended Posts

ppulga22

Olá pessoal. Estou a desenvolver um projeto, e empanquei aqui num detalhe e não estou a ver uma boa maneira de resolver isto. Bem, o projeto consiste em implementar o padrão Publisher Subscriber, a correr via TCP e UDP. Ora bem, eu tenho uma class central, onde armazenamos as mensagens, tópicos e subscribers, e onde estão implementados os métodos principais. Depois, tenho uma class UDPServer, e TCPServer, assim como os clientes. Como estou a trabalhar com singleton na class central, criei uma nova class Driver, só para arrancar ambos os servidores num main(). Posto isto, o meu problema está em partilhar as mensagens, o sistema está a funcionar muito bem, ambos os lados adicionam os subs, e as mensagens publicadas, o problema está em fazer o broadcast. Ou seja, nós recebemos uma mensagem do cliente, do tipo, {"type": pub, "topic": music, "payload": {"name": Valete, "álbum": Serviço Publico, "ano": 2006}}. Aqui o Server vai descodificar a mensagem e enviar para os subs do tópico music. Mas, se receber a mensagem do lado do Server TCP, não consigo enviá-la para o cliente UDP e vice versa. Alguma sugestão?

package com.machado.filipe.pubsub.server;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.json.JSONException;
import org.json.JSONObject;

import com.machado.filipe.pubsub.Message;
import com.machado.filipe.pubsub.publisher.Publisher;
import com.machado.filipe.pubsub.publisher.PublisherImpl;
import com.machado.filipe.pubsub.service.PubSubService;
import com.machado.filipe.pubsub.subscriber.Subscriber;
import com.machado.filipe.pubsub.subscriber.SubscriberImpl;


public class UDPServer {
       
        
        static PubSubService pubSubService;
      
        public void run() throws Exception {
     
            int serverport = 7777;        
     
            DatagramSocket udpServerSocket = new DatagramSocket(serverport);        
     
            System.out.println("Server started...\n");
     
            while(true)
            {
                byte[] receiveData = new byte[1024];          
     
                DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
     
                udpServerSocket.receive(receivePacket);           
     
                String clientMessage = (new String(receivePacket.getData())).trim();
     
                
                InetAddress clientIP = receivePacket.getAddress();           
     
            
              int clientport = receivePacket.getPort(); 
             
                byte[] sendData  = new byte[1024];

                decodeCommand(new String(clientMessage));
                   JSONObject jsonObj = new JSONObject(clientMessage);
                String type = jsonObj.getString("type");

                if(type.equalsIgnoreCase("pub")) {
                    String topic = jsonObj.getString("topic");
                    JSONObject payload = jsonObj.getJSONObject("payload");
                    Publisher publisher = new PublisherImpl();

                    Message message = new Message(topic, payload);
                    publisher.publish(message, pubSubService);
                    Set<Subscriber> _subscribers = PubSubService.instance().getSubscribers(topic);

                    for(Subscriber sub : _subscribers) {
                            sub.getMessagesForSubscriberOfTopic(topic);
                            
                            String response = sub.getMessages().toString();
                            sendData = response.getBytes();
                        
                          DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
                          clientIP, sub.getClientport()); 
                          udpServerSocket.send(sendPacket);
                            
                    }
                    
                }else if (type.equalsIgnoreCase("sub")) {
                    String topic = jsonObj.getString("topic");
                    Subscriber subscriber = new SubscriberImpl(clientIP, clientport);
                    subscriber.addSubscriber(topic, pubSubService);
                    subscriber.getMessagesForSubscriberOfTopic(topic);
                    String response = subscriber.getMessages().toString();
                    sendData = response.getBytes();
                
                  DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
                  clientIP, clientport); 
                  udpServerSocket.send(sendPacket);
                  
                    
                }else if (type.equalsIgnoreCase("unsub")) {
                    String topic = jsonObj.getString("topic");
                    for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) {
                        if(sub.getClientport() == clientport && sub.getClientIP() == clientIP) {
                              sub.unSubscribe(topic);
                          }
                    }
                    
                    
                }
            
            }
        }
        private static void decodeCommand(String jsonString) throws JSONException {
              JSONObject jsonObject = new JSONObject(jsonString);
              System.out.println(jsonObject);  
              }
}

 Deixo esta classe para verem como está o server UDP, se precisarem de mais informação ou código, digam algo :D:D

 

Obrigado

Share this post


Link to post
Share on other sites
ppulga22

Tenho em alternativa a utilização de canais, mas não sei se me vai facilitar o processo. Deixo aqui o Server de ambos os protocolos.

package com.machado.filipe.pubsub.server;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.Channel;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.json.JSONException;
import org.json.JSONObject;

import com.machado.filipe.pubsub.Message;
import com.machado.filipe.pubsub.publisher.Publisher;
import com.machado.filipe.pubsub.publisher.PublisherImpl;
import com.machado.filipe.pubsub.service.PubSubService;
import com.machado.filipe.pubsub.subscriber.Subscriber;
import com.machado.filipe.pubsub.subscriber.SubscriberImpl;

/**
 * A more robust daytime service, that handles TCP and UDP connections and
 * provides exception handling and error logging.
 */
public class NIOServer {
    public static void main(String args[]) {
        try {

            PubSubService pubSubService = new PubSubService();

            // Handle startup exceptions at the end of this block
            // Get an encoder for converting strings to bytes
            CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();

            // Allow an alternative port for testing with non-root accounts
            InetAddress hostIP = InetAddress.getLocalHost();
            int port = 4444; // RFC867 specifies this port.
            if (args.length > 0)
                port = Integer.parseInt(args[0]);

            // The port we'll listen on
            SocketAddress localport = new InetSocketAddress(port);

            // Create and bind a tcp channel to listen for connections on.
            ServerSocketChannel tcpserver = ServerSocketChannel.open();
            tcpserver.socket().bind(localport);

            // Also create and bind a DatagramChannel to listen on.
            DatagramChannel udpserver = DatagramChannel.open();
            udpserver.socket().bind(localport);

            // Specify non-blocking mode for both channels, since our
            // Selector object will be doing the blocking for us.
            tcpserver.configureBlocking(false);
            udpserver.configureBlocking(false);

            // The Selector object is what allows us to block while waiting
            // for activity on either of the two channels.
            Selector selector = Selector.open();

            // Register the channels with the selector, and specify what
            // conditions (a connection ready to accept, a datagram ready
            // to read) we'd like the Selector to wake up for.
            // These methods return SelectionKey objects, which we don't
            // need to retain in this example.
            //tcpserver.register(selector, SelectionKey.OP_ACCEPT);
            tcpserver.register(selector, tcpserver.validOps());
            // tcpserver.register(selector, SelectionKey.OP_WRITE);
            udpserver.register(selector, SelectionKey.OP_READ);
            // udpserver.register(selector, SelectionKey.OP_WRITE);

            // This is an empty byte buffer to receive emtpy datagrams with.
            // If a datagram overflows the receive buffer size, the extra bytes
            // are automatically discarded, so we don't have to worry about
            // buffer overflow attacks here.
            ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);

            // Now loop forever, processing client connections
            for (;;) {
                try { // Handle per-connection problems below
                        // Wait for a client to connect
                    selector.select();

                    // If we get here, a client has probably connected, so
                    // put our response into a ByteBuffer.
                    // String date = new java.util.Date().toString() + "\r\n";
                    // ByteBuffer response = encoder.encode(CharBuffer.wrap(date));
                    // Get the SelectionKey objects for the channels that have
                    // activity on them. These are the keys returned by the
                    // register() methods above. They are returned in a
                    // java.util.Set.
                    Set keys = selector.selectedKeys();

                    // Iterate through the Set of keys.
                    for (Iterator i = keys.iterator(); i.hasNext();) {
                        // Get a key from the set, and remove it from the set
                        SelectionKey key = (SelectionKey) i.next();
                        i.remove();

                        // Get the channel associated with the key
                        Channel c = (Channel) key.channel();

                        // Now test the key and the channel to find out
                        // whether something happend on the TCP or UDP channel
                        if (key.isAcceptable() && c == tcpserver) {
                            try {
                            // A client has attempted to connect via TCP.
                            // Accept the connection now.
                            SocketChannel client = tcpserver.accept();
                            // If we accepted the connection successfully,
                            // the send our respone back to the client.
                            client.read(receiveBuffer);
                            receiveBuffer.flip();
                            String clientMessage = ByteBufferToString(receiveBuffer);
                            receiveBuffer.clear();
                            decodeCommand(new String(clientMessage));
                            JSONObject jsonObj = new JSONObject(clientMessage);
                            String type = jsonObj.getString("type");
                            System.out.println(clientMessage);
                            if (client != null) {
                                String topic = jsonObj.getString("topic");

                                if (type.equalsIgnoreCase("sub")) {
                                    
                                    Subscriber subscriber = new SubscriberImpl();
                                    subscriber.addSubscriber(topic, pubSubService);                                    
                                    
                                    PubSubService.instance().getMessagesForSubscriberOfTopic(topic);

                                    String message = subscriber.getMessages().toString();
                                    ByteBuffer response = ByteBuffer.allocate(1024);
                                    response.put(message.getBytes());
                                    response.flip();
                                    client.write(response);
                                    response.clear();
                                    System.out.println("response : " + response);
                                    System.out.println("Adicionado sub.");
                                    
                                }else if(type.equalsIgnoreCase("pub")) {
                                    JSONObject payload = jsonObj.getJSONObject("payload");
                                    Publisher publisher = new PublisherImpl();
                                    Message message = new Message(topic, payload);
                                    publisher.publish(message, pubSubService);
                                    System.out.println(message.getPayload().toString());
                                    
                                    if (!PubSubService.instance().getSubscribers(topic).isEmpty()) {

                                        for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) {
                                            ByteBuffer response = ByteBuffer.allocate(1024);
                                            PubSubService.instance().getMessagesForSubscriberOfTopic(topic);
                                            String respon = sub.getMessages().toString();
                                            response.put(respon.getBytes());
                                            response.flip();
                                            client.write(response);
                                            response.clear();
                                            System.out.println("Dentro do for: " + respon);
                                        }
                                    }
                                }
                                // client.write(response); // send respone
                                client.close(); // close connection
                            }
                            }catch (Exception e) {
                                System.out.println("Erro no client TCP");
                            }
                        } else if (key.isReadable() && c == udpserver) {
                            // A UDP datagram is waiting. Receive it now,
                            // noting the address it was sent from.
                            SocketAddress clientAddress = udpserver.receive(receiveBuffer);    

                            receiveBuffer.flip();
                            String clientMessage = ByteBufferToString(receiveBuffer);
                            decodeCommand(new String(clientMessage));
                            JSONObject jsonObj = new JSONObject(clientMessage);
                            String type = jsonObj.getString("type");

                            // If we got the datagram successfully, send
                            // the date and time in a response packet.
                            if (clientAddress != null) {

                                String topic = jsonObj.getString("topic");

                                if (type.equalsIgnoreCase("sub")) {

                                    Subscriber subscriber = new SubscriberImpl(clientAddress);
                                    subscriber.addSubscriber(topic, pubSubService);

                                    PubSubService.instance().getMessagesForSubscriberOfTopic(topic);

                                    String message = subscriber.getMessages().toString();
                                    System.out.println("message:" + message);
                                    ByteBuffer response = ByteBuffer.allocate(1024);
                                    response.put(message.getBytes());
                                    response.flip();
                                    udpserver.send(response, subscriber.getClientAddress());
                                    response.clear();

                                } else if (type.equalsIgnoreCase("pub")) {

                                    JSONObject payload = jsonObj.getJSONObject("payload");
                                    Publisher publisher = new PublisherImpl();
                                    Message message = new Message(topic, payload);
                                    publisher.publish(message, pubSubService);
                                    System.out.println(message.getPayload().toString());

                                    if (!PubSubService.instance().getSubscribers(topic).isEmpty()) {

                                        for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) {
                                            ByteBuffer response = ByteBuffer.allocate(1024);
                                            PubSubService.instance().getMessagesForSubscriberOfTopic(topic);
                                            String respon = sub.getMessages().toString();
                                            response.put(respon.getBytes());
                                            response.flip();
                                            udpserver.send(response, sub.getClientAddress());
                                            response.clear();
                                            System.out.println("Dentro do for: " + respon);
                                        }
                                    }
                                } else if (type.equalsIgnoreCase("unsub")) {                                    
                                    if (!PubSubService.instance().getSubscribers(topic).isEmpty()) {
                                        for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) {
                                            //clientAddress.holder().addr().holder().adress
                                            //TODO: comparação não funciona!!
                                            if(sub.getClientAddress() == clientAddress) {
                                                sub.unSubscribe(topic);
                                                break;
                                            }
                                        }
                                    }                                    
                                    
                                }

                                receiveBuffer.clear();
                            }
                        }
                    }
                } catch (java.io.IOException e) {
                    // This is a (hopefully transient) problem with a single
                    // connection: we log the error, but continue running.
                    // We use our classname for the logger so that a sysadmin
                    // can configure logging for this server independently
                    // of other programs.
                    Logger l = Logger.getLogger(NIOServer.class.getName());
                    l.log(Level.WARNING, "IOException in DaytimeServer", e);
                } catch (Throwable t) {
                    // If anything else goes wrong (out of memory, for example)
                    // then log the problem and exit.
                    Logger l = Logger.getLogger(NIOServer.class.getName());
                    l.log(Level.SEVERE, "FATAL error in DaytimeServer", t);
                    System.exit(1);
                }
            }
        } catch (Exception e) {
            // This is a startup error: there is no need to log it;
            // just print a message and exit
            System.err.println(e);
            System.exit(1);
        }
    }

    private static String ByteBufferToString(ByteBuffer buffer) {
        int limit = buffer.limit();
        byte bytes[] = new byte[limit];
        buffer.get(bytes, 0, limit);
        String s = new String(bytes);
        return s;
    }

    private static void decodeCommand(String jsonString) throws JSONException {
        JSONObject jsonObject = new JSONObject(jsonString);
        System.out.println(jsonObject);
    }
}

 

Share this post


Link to post
Share on other sites
HappyHippyHippo

não percebo a dificuldade de receber por um canal e enviar por outro


IRC : sim, é algo que ainda existe >> #p@p

Share this post


Link to post
Share on other sites
ppulga22

Vou tentar explicar-me melhor. Esquecendo os canais por agora. Temos uma class Service, com um :

Map<String, Set<Subscriber>> subscribersTopicMap = new HashMap<String, Set<Subscriber>>();

Estamos a adicionar subscribers via UDP:

Subscriber subscriber = new SubscriberImpl(clientIP, clientport);

E também TCP:

Subscriber subscriber = new SubscriberImpl(new ClientThread(clientSocket, true));

O que eu queria, era que qualquer um dos servers ao receber o comando pub, fosse capaz de reencaminhar as mensagens para ambos os tipos de clientes, UDP e TCP. E não sei ao certo como proceder, sendo que, por UDP, estamos a usar packets:

for(Subscriber sub : _subscribers) {
		        			sub.getMessagesForSubscriberOfTopic(topic);
			        		
			        		String response = sub.getMessages().toString();
				        	sendData = response.getBytes();
						
						  DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
						  clientIP, sub.getClientport()); 
						  udpServerSocket.send(sendPacket);
				        	
		        	}

E no lado do TCP:

for(Subscriber sub : PubSubService.instance().getSubscribers(topic)) {
				    	
				    	sub.getMessagesForSubscriberOfTopic(topic);
				    	String response = sub.getMessages().toString();
				    	os.println(response);
				    	
						
						  if(sub.getClientThread() != null) { 
						  sub.getClientThread().sendMessage(message); } 
			      }

Onde o sendMessage:

 public void sendMessage(Message message){
		  try {
			  String response;
		      os = new PrintStream(clientSocket.getOutputStream());
		      response = "Topic -> " + message.getTopic() + " : " + message.getPayload();
		    	os.println(response);			  
		  }
		  catch(Exception e){
			  
		  }
	  }

Acho que é isto, em qualquer um dos lados, quero conseguir chegar a ambos os protocolos, porque para já, com o servidor UDP só chego aos clientes UDP e o mesmo para o TCP.

PS: Os canais era para ver se me facilitava o problema, mas ainda não aprofundei o assunto.

Obrigado e um abraço ;) !!

Edited by ppulga22
Para completar o tópico.

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

×
×
  • Create New...

Important Information

By using this site you accept our Terms of Use and Privacy Policy. We have placed cookies on your device to help make this website better. You can adjust your cookie settings, otherwise we'll assume you're okay to continue.