ppulga22 Posted July 28, 2019 at 08:11 PM Report #615805 Posted July 28, 2019 at 08:11 PM Olá pessoal. Estou a desenvolver um projeto, e empanquei aqui num detalhe e não estou a ver uma boa maneira de resolver isto. Bem, o projeto consiste em implementar o padrão Publisher Subscriber, a correr via TCP e UDP. Ora bem, eu tenho uma class central, onde armazenamos as mensagens, tópicos e subscribers, e onde estão implementados os métodos principais. Depois, tenho uma class UDPServer, e TCPServer, assim como os clientes. Como estou a trabalhar com singleton na class central, criei uma nova class Driver, só para arrancar ambos os servidores num main(). Posto isto, o meu problema está em partilhar as mensagens, o sistema está a funcionar muito bem, ambos os lados adicionam os subs, e as mensagens publicadas, o problema está em fazer o broadcast. Ou seja, nós recebemos uma mensagem do cliente, do tipo, {"type": pub, "topic": music, "payload": {"name": Valete, "álbum": Serviço Publico, "ano": 2006}}. Aqui o Server vai descodificar a mensagem e enviar para os subs do tópico music. Mas, se receber a mensagem do lado do Server TCP, não consigo enviá-la para o cliente UDP e vice versa. Alguma sugestão? package com.machado.filipe.pubsub.server; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.util.HashSet; import java.util.List; import java.util.Set; import org.json.JSONException; import org.json.JSONObject; import com.machado.filipe.pubsub.Message; import com.machado.filipe.pubsub.publisher.Publisher; import com.machado.filipe.pubsub.publisher.PublisherImpl; import com.machado.filipe.pubsub.service.PubSubService; import com.machado.filipe.pubsub.subscriber.Subscriber; import com.machado.filipe.pubsub.subscriber.SubscriberImpl; public class UDPServer { static PubSubService pubSubService; public void run() throws Exception { int serverport = 7777; DatagramSocket udpServerSocket = new DatagramSocket(serverport); System.out.println("Server started...\n"); while(true) { byte[] receiveData = new byte[1024]; DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); udpServerSocket.receive(receivePacket); String clientMessage = (new String(receivePacket.getData())).trim(); InetAddress clientIP = receivePacket.getAddress(); int clientport = receivePacket.getPort(); byte[] sendData = new byte[1024]; decodeCommand(new String(clientMessage)); JSONObject jsonObj = new JSONObject(clientMessage); String type = jsonObj.getString("type"); if(type.equalsIgnoreCase("pub")) { String topic = jsonObj.getString("topic"); JSONObject payload = jsonObj.getJSONObject("payload"); Publisher publisher = new PublisherImpl(); Message message = new Message(topic, payload); publisher.publish(message, pubSubService); Set<Subscriber> _subscribers = PubSubService.instance().getSubscribers(topic); for(Subscriber sub : _subscribers) { sub.getMessagesForSubscriberOfTopic(topic); String response = sub.getMessages().toString(); sendData = response.getBytes(); DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, clientIP, sub.getClientport()); udpServerSocket.send(sendPacket); } }else if (type.equalsIgnoreCase("sub")) { String topic = jsonObj.getString("topic"); Subscriber subscriber = new SubscriberImpl(clientIP, clientport); subscriber.addSubscriber(topic, pubSubService); subscriber.getMessagesForSubscriberOfTopic(topic); String response = subscriber.getMessages().toString(); sendData = response.getBytes(); DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, clientIP, clientport); udpServerSocket.send(sendPacket); }else if (type.equalsIgnoreCase("unsub")) { String topic = jsonObj.getString("topic"); for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) { if(sub.getClientport() == clientport && sub.getClientIP() == clientIP) { sub.unSubscribe(topic); } } } } } private static void decodeCommand(String jsonString) throws JSONException { JSONObject jsonObject = new JSONObject(jsonString); System.out.println(jsonObject); } } Deixo esta classe para verem como está o server UDP, se precisarem de mais informação ou código, digam algo 😄😄 Obrigado
ppulga22 Posted July 28, 2019 at 08:19 PM Author Report #615806 Posted July 28, 2019 at 08:19 PM Tenho em alternativa a utilização de canais, mas não sei se me vai facilitar o processo. Deixo aqui o Server de ambos os protocolos. package com.machado.filipe.pubsub.server; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.Channel; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.util.Iterator; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import org.json.JSONException; import org.json.JSONObject; import com.machado.filipe.pubsub.Message; import com.machado.filipe.pubsub.publisher.Publisher; import com.machado.filipe.pubsub.publisher.PublisherImpl; import com.machado.filipe.pubsub.service.PubSubService; import com.machado.filipe.pubsub.subscriber.Subscriber; import com.machado.filipe.pubsub.subscriber.SubscriberImpl; /** * A more robust daytime service, that handles TCP and UDP connections and * provides exception handling and error logging. */ public class NIOServer { public static void main(String args[]) { try { PubSubService pubSubService = new PubSubService(); // Handle startup exceptions at the end of this block // Get an encoder for converting strings to bytes CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); // Allow an alternative port for testing with non-root accounts InetAddress hostIP = InetAddress.getLocalHost(); int port = 4444; // RFC867 specifies this port. if (args.length > 0) port = Integer.parseInt(args[0]); // The port we'll listen on SocketAddress localport = new InetSocketAddress(port); // Create and bind a tcp channel to listen for connections on. ServerSocketChannel tcpserver = ServerSocketChannel.open(); tcpserver.socket().bind(localport); // Also create and bind a DatagramChannel to listen on. DatagramChannel udpserver = DatagramChannel.open(); udpserver.socket().bind(localport); // Specify non-blocking mode for both channels, since our // Selector object will be doing the blocking for us. tcpserver.configureBlocking(false); udpserver.configureBlocking(false); // The Selector object is what allows us to block while waiting // for activity on either of the two channels. Selector selector = Selector.open(); // Register the channels with the selector, and specify what // conditions (a connection ready to accept, a datagram ready // to read) we'd like the Selector to wake up for. // These methods return SelectionKey objects, which we don't // need to retain in this example. //tcpserver.register(selector, SelectionKey.OP_ACCEPT); tcpserver.register(selector, tcpserver.validOps()); // tcpserver.register(selector, SelectionKey.OP_WRITE); udpserver.register(selector, SelectionKey.OP_READ); // udpserver.register(selector, SelectionKey.OP_WRITE); // This is an empty byte buffer to receive emtpy datagrams with. // If a datagram overflows the receive buffer size, the extra bytes // are automatically discarded, so we don't have to worry about // buffer overflow attacks here. ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); // Now loop forever, processing client connections for (;;) { try { // Handle per-connection problems below // Wait for a client to connect selector.select(); // If we get here, a client has probably connected, so // put our response into a ByteBuffer. // String date = new java.util.Date().toString() + "\r\n"; // ByteBuffer response = encoder.encode(CharBuffer.wrap(date)); // Get the SelectionKey objects for the channels that have // activity on them. These are the keys returned by the // register() methods above. They are returned in a // java.util.Set. Set keys = selector.selectedKeys(); // Iterate through the Set of keys. for (Iterator i = keys.iterator(); i.hasNext();) { // Get a key from the set, and remove it from the set SelectionKey key = (SelectionKey) i.next(); i.remove(); // Get the channel associated with the key Channel c = (Channel) key.channel(); // Now test the key and the channel to find out // whether something happend on the TCP or UDP channel if (key.isAcceptable() && c == tcpserver) { try { // A client has attempted to connect via TCP. // Accept the connection now. SocketChannel client = tcpserver.accept(); // If we accepted the connection successfully, // the send our respone back to the client. client.read(receiveBuffer); receiveBuffer.flip(); String clientMessage = ByteBufferToString(receiveBuffer); receiveBuffer.clear(); decodeCommand(new String(clientMessage)); JSONObject jsonObj = new JSONObject(clientMessage); String type = jsonObj.getString("type"); System.out.println(clientMessage); if (client != null) { String topic = jsonObj.getString("topic"); if (type.equalsIgnoreCase("sub")) { Subscriber subscriber = new SubscriberImpl(); subscriber.addSubscriber(topic, pubSubService); PubSubService.instance().getMessagesForSubscriberOfTopic(topic); String message = subscriber.getMessages().toString(); ByteBuffer response = ByteBuffer.allocate(1024); response.put(message.getBytes()); response.flip(); client.write(response); response.clear(); System.out.println("response : " + response); System.out.println("Adicionado sub."); }else if(type.equalsIgnoreCase("pub")) { JSONObject payload = jsonObj.getJSONObject("payload"); Publisher publisher = new PublisherImpl(); Message message = new Message(topic, payload); publisher.publish(message, pubSubService); System.out.println(message.getPayload().toString()); if (!PubSubService.instance().getSubscribers(topic).isEmpty()) { for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) { ByteBuffer response = ByteBuffer.allocate(1024); PubSubService.instance().getMessagesForSubscriberOfTopic(topic); String respon = sub.getMessages().toString(); response.put(respon.getBytes()); response.flip(); client.write(response); response.clear(); System.out.println("Dentro do for: " + respon); } } } // client.write(response); // send respone client.close(); // close connection } }catch (Exception e) { System.out.println("Erro no client TCP"); } } else if (key.isReadable() && c == udpserver) { // A UDP datagram is waiting. Receive it now, // noting the address it was sent from. SocketAddress clientAddress = udpserver.receive(receiveBuffer); receiveBuffer.flip(); String clientMessage = ByteBufferToString(receiveBuffer); decodeCommand(new String(clientMessage)); JSONObject jsonObj = new JSONObject(clientMessage); String type = jsonObj.getString("type"); // If we got the datagram successfully, send // the date and time in a response packet. if (clientAddress != null) { String topic = jsonObj.getString("topic"); if (type.equalsIgnoreCase("sub")) { Subscriber subscriber = new SubscriberImpl(clientAddress); subscriber.addSubscriber(topic, pubSubService); PubSubService.instance().getMessagesForSubscriberOfTopic(topic); String message = subscriber.getMessages().toString(); System.out.println("message:" + message); ByteBuffer response = ByteBuffer.allocate(1024); response.put(message.getBytes()); response.flip(); udpserver.send(response, subscriber.getClientAddress()); response.clear(); } else if (type.equalsIgnoreCase("pub")) { JSONObject payload = jsonObj.getJSONObject("payload"); Publisher publisher = new PublisherImpl(); Message message = new Message(topic, payload); publisher.publish(message, pubSubService); System.out.println(message.getPayload().toString()); if (!PubSubService.instance().getSubscribers(topic).isEmpty()) { for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) { ByteBuffer response = ByteBuffer.allocate(1024); PubSubService.instance().getMessagesForSubscriberOfTopic(topic); String respon = sub.getMessages().toString(); response.put(respon.getBytes()); response.flip(); udpserver.send(response, sub.getClientAddress()); response.clear(); System.out.println("Dentro do for: " + respon); } } } else if (type.equalsIgnoreCase("unsub")) { if (!PubSubService.instance().getSubscribers(topic).isEmpty()) { for (Subscriber sub : PubSubService.instance().getSubscribers(topic)) { //clientAddress.holder().addr().holder().adress //TODO: comparação não funciona!! if(sub.getClientAddress() == clientAddress) { sub.unSubscribe(topic); break; } } } } receiveBuffer.clear(); } } } } catch (java.io.IOException e) { // This is a (hopefully transient) problem with a single // connection: we log the error, but continue running. // We use our classname for the logger so that a sysadmin // can configure logging for this server independently // of other programs. Logger l = Logger.getLogger(NIOServer.class.getName()); l.log(Level.WARNING, "IOException in DaytimeServer", e); } catch (Throwable t) { // If anything else goes wrong (out of memory, for example) // then log the problem and exit. Logger l = Logger.getLogger(NIOServer.class.getName()); l.log(Level.SEVERE, "FATAL error in DaytimeServer", t); System.exit(1); } } } catch (Exception e) { // This is a startup error: there is no need to log it; // just print a message and exit System.err.println(e); System.exit(1); } } private static String ByteBufferToString(ByteBuffer buffer) { int limit = buffer.limit(); byte bytes[] = new byte[limit]; buffer.get(bytes, 0, limit); String s = new String(bytes); return s; } private static void decodeCommand(String jsonString) throws JSONException { JSONObject jsonObject = new JSONObject(jsonString); System.out.println(jsonObject); } }
HappyHippyHippo Posted July 30, 2019 at 08:41 PM Report #615829 Posted July 30, 2019 at 08:41 PM não percebo a dificuldade de receber por um canal e enviar por outro IRC : sim, é algo que ainda existe >> #p@p Portugol Plus
ppulga22 Posted August 1, 2019 at 05:40 PM Author Report #615865 Posted August 1, 2019 at 05:40 PM Vou tentar explicar-me melhor. Esquecendo os canais por agora. Temos uma class Service, com um : Map<String, Set<Subscriber>> subscribersTopicMap = new HashMap<String, Set<Subscriber>>(); Estamos a adicionar subscribers via UDP: Subscriber subscriber = new SubscriberImpl(clientIP, clientport); E também TCP: Subscriber subscriber = new SubscriberImpl(new ClientThread(clientSocket, true)); O que eu queria, era que qualquer um dos servers ao receber o comando pub, fosse capaz de reencaminhar as mensagens para ambos os tipos de clientes, UDP e TCP. E não sei ao certo como proceder, sendo que, por UDP, estamos a usar packets: for(Subscriber sub : _subscribers) { sub.getMessagesForSubscriberOfTopic(topic); String response = sub.getMessages().toString(); sendData = response.getBytes(); DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, clientIP, sub.getClientport()); udpServerSocket.send(sendPacket); } E no lado do TCP: for(Subscriber sub : PubSubService.instance().getSubscribers(topic)) { sub.getMessagesForSubscriberOfTopic(topic); String response = sub.getMessages().toString(); os.println(response); if(sub.getClientThread() != null) { sub.getClientThread().sendMessage(message); } } Onde o sendMessage: public void sendMessage(Message message){ try { String response; os = new PrintStream(clientSocket.getOutputStream()); response = "Topic -> " + message.getTopic() + " : " + message.getPayload(); os.println(response); } catch(Exception e){ } } Acho que é isto, em qualquer um dos lados, quero conseguir chegar a ambos os protocolos, porque para já, com o servidor UDP só chego aos clientes UDP e o mesmo para o TCP. PS: Os canais era para ver se me facilitava o problema, mas ainda não aprofundei o assunto. Obrigado e um abraço 😉 !!
Recommended Posts
Create an account or sign in to comment
You need to be a member in order to leave a comment
Create an account
Sign up for a new account in our community. It's easy!
Register a new accountSign in
Already have an account? Sign in here.
Sign In Now