Get up to 80 % extra points for free! More info:

Lesson 18 - Java Chat - Client - Server Connection Part 2

In the previous lesson, Java Chat - Client - Server Connection Part 1, we created an interface for a client communicator, among other things.

In today's Java tutorial we're going to implement the communicator.

Client communicator

In the service package, we'll create a new ClientCommunicationService class and let it implement the IClientCommunicationService interface:

public class ClientCommunicationService implements IClientCommunicationService {

}

Variables and Constants

We'll add the following variables and constants to the class:

private final ObjectProperty<Socket> socket = new SimpleObjectProperty<>(this, "socket", null);
    private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(this, "connectionState", ConnectionState.DISCONNECTED);
    private final HashMap<String, List<OnDataReceivedListener>> listeners = new HashMap<>();
    private final StringProperty host = new SimpleStringProperty(this, "host", null);
    private final IntegerProperty port = new SimpleIntegerProperty(this, "port", -1);
    private final StringProperty connectedServerName = new SimpleStringProperty(this, "connectedServerName", null);
    private final ObjectProperty<ServerStatus> serverStatus = new SimpleObjectProperty<>(this, "serverStatus", ServerStatus.EMPTY);
    private final Queue<Request> requests = new LinkedBlockingQueue<>();

    private ReaderThread readerThread;
    private WriterThread writerThread;

All the constants are self-explanatory, so we don't need to comment them. It's worth mentioning the socket constant, which is wrapped in the ObjectProperty class. This gives us the opportunity to observe changes in the value. Interesting is also the requests queue, using which we'll realize the request-response type of communication. We'll create the Request class later. The readerThread and writerThread will contain the reader and writer threads. We won't initialize these variables until we try to create a new connection.

Constructor

The class constructor won't require any parameters. In it, we'll set the listener on the socket and create a binding to the server name, which will have the format: "name: port":

public ClientCommunicationService() {
    socket.addListener(this::socketListener);
    connectedServerName.bind(Bindings.createStringBinding(() -> String.format("%s:%d", host.get(), port.get()), host, port, connectionState));
}

Socket Status Change Listener

We'll create a private socketListener() method, which we registered in the constructor. In this method we'll initialize/cancel the reader/writer thread:

private void socketListener(ObservableValue<? extends Socket> observableValue, Socket oldSocket, Socket newSocket) {
        if (newSocket == null) {
            readerThread = null;
            writerThread = null;
            unregisterMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener);
            return;
        }

        try {
            readerThread = new ReaderThread(newSocket.getInputStream(), listener, this::disconnect);
            writerThread = new WriterThread(newSocket.getOutputStream(), this::disconnect);

            readerThread.start();
            writerThread.start();
            registerMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener);
        } catch (IOException e) {
            System.out.println("Server communication error eccurred.");
        }
    }

The method consists of two parts. The first part handles the case when the connection was terminated for some reason and it's necessary to remove the old reader/writer thread instances. The rest of the method assumes that the newSocket variable contains a new valid socket based on the newly created connection. New instances of the reader/writer thread are created and started by start(). We'll explain the (un)registerMessageObserver() method when implementing it.

Delegating Received Messages Processing

We'll add another instance constant to the class, which will contain an anonymous function sending messages to registered observers:

private final OnDataReceivedListener listener = message -> {
        if (message.isResponce()) {
            final Request poll = requests.poll();
            if (poll != null) {
                poll.onResponce(message);
            }
            return;
        }

        final List<OnDataReceivedListener> listenerList = listeners.get(message.getType());
        if (listenerList == null) {
            return;
        }

        for (OnDataReceivedListener listener : listenerList) {
            listener.onDataReceived(message);
        }
    };

At the beginning of the method, we check whether the received message is a response to a request. If so, the message is retrieved and the message handler from the requests queue is called. If it's a regular message, we get all listeners from the listeners map and let them process the received message.

Subscribe/Unsub­scribe To Messages

Other methods that we need to implement according to the interface are methods for subscribing and unsubscribing. These methods will modify the listeners map:

@Override
public synchronized void registerMessageObserver(String messageType, OnDataReceivedListener listener) {
    List<OnDataReceivedListener> listenerList = listeners.computeIfAbsent(messageType, k -> new ArrayList<>());

    listenerList.add(listener);
}

@Override
public synchronized void unregisterMessageObserver(String messageType, OnDataReceivedListener listener) {
    List<OnDataReceivedListener> listenerList = listeners.get(messageType);
    if (listenerList == null) {
        return;
    }

    listenerList.remove(listener);
}

When registering the listener, we use the computeIfAbsent() method, which looks at the map and creates a value if there's no value under the specified key.

Establishing the Connection

Finally, we come to the most important methods of the whole communicator. Let's start implementing the connect() method. We're going to use the CompletableFuture class for the first time:

@Override
public CompletableFuture <Void> connect(String host, int port) {
 if (isConnected()) {
  throw new RuntimeException("The connection already exists.");
 }

 changeState(ConnectionState.CONNECTING);

 return CompletableFuture.supplyAsync(() -> {
   final Socket socket = new Socket();
   try {
    socket.connect(new InetSocketAddress(host, port), 3000);
    return socket;
   } catch (IOException e) {
    return null;
   }
  }, ThreadPool.COMMON_EXECUTOR)
  .thenApplyAsync(socket -> {
   this.socket.set(socket);
   if (socket != null) {
    this.host.set(host);
    this.port.set(port);
   } else {
    changeState(ConnectionState.DISCONNECTED);
    this.host.set(null);
    this.port.set(-1);
   }
   if (socket == null) {
    throw new RuntimeException("Unable to create the connection.");
   }

   return null;
  }, ThreadPool.JAVAFX_EXECUTOR);
}

The method is divided into logical parts again. In the first part we check whether we're already connected to the server. If so, we throw an exception. We don't even have to handle the RuntimeException, it'll be just written to our console. The important thing is the application won't close. The changeState() method tells the others that we're trying to connect to the server.

In the second part of the method we create a future in which we try to establish a connection with the server by calling the socket.connect() method. By the ThreadPool.COMMON_EXECUTOR constant we set the connection to a separate thread. If we connect to the server successfully, we return the socket. The method thenApplyAsync() transforms the socket into a result.

In the third part we store the socket by calling this.socket.set(socket). This calls, among other things, the changeListener and creates/deletes the reader and writer threads. The whole third part must be run on the JavaFX thread. That's because we'll later bind graphical components to some of the observable constants, and we already know these can only be updated in the JavaFX thread, otherwise an exception will be thrown.

Terminating the Connection

We'll terminate the connection with a disconnect() method. The method's goal will be to terminate the reader/writer thread properly:

public CompletableFuture<Boolean> disconnect() {
    if (!isConnected()) {
        return CompletableFuture.completedFuture(false);
    }

    return CompletableFuture.supplyAsync(() -> {
        try {
            socket.get().close();
            readerThread.shutdown();
            writerThread.shutdown();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }, ThreadPool.COMMON_EXECUTOR)
        .thenApplyAsync(success -> {
            if (success) {
                this.socket.set(null);
                changeState(ConnectionState.DISCONNECTED);
            }

            return success;
        }, ThreadPool.JAVAFX_EXECUTOR);
}

If the connection is terminated successfully, this.socket.set(null) command removes the reader and writer threads and the communicator enters the DISCONNECTED state.

Sending Messages

We're going to send two types of messages:

  • not waiting for the result
  • waiting for the result

The method not waiting for the result will be very simple. It'll take the message, pass it to the writer thread, and just won't care anymore:

public synchronized void sendMessage(IMessage message) {
    if (writerThread != null) {
        writerThread.addMessageToQueue(message);
    }
}

Sending a message waiting for a result has one problem that we need to solve. That is waiting for a response from the server:

public synchronized CompletableFuture<IMessage> sendMessageFuture(IMessage message) {
    return CompletableFuture.supplyAsync(() -> {
        sendMessage(message);
        return null;
    })
    .thenCompose(ignored -> {
        Request request = new Request();
        requests.add(request);
        return request.getFuture();
    });
}

The method to send a message and get the response returns a future in which the response will come. First the message is sent as usual and then the new thenCompose() method is called. This method basically says that we get the result of the future from another CompletableFuture class instance. We get this other instance by calling the getFuture() method on the Request class instance, which we'll declare in a moment.

Request-Response Messages

We'll create a utility class to ensure that we'll wait for a server response. It'll be a Request class, everything should suddenly make sense:

class Request {

    private final Object lock = new Object();

    private boolean waiting = true;
    private IMessage responce;

    CompletableFuture<IMessage> getFuture() {
        return CompletableFuture.supplyAsync(() -> {
            while (waiting) {
                synchronized (lock) {
                    try {
                        lock.wait();
                    } catch (InterruptedException ignored) {}
                }
            }
            return responce;
        });
    }

    void onResponce(IMessage message) {
        this.responce = message;
        waiting = false;
        synchronized (lock) {
            lock.notify();
        }
    }
}

The class has only two methods: getFuture() and onResponce(). The first method creates a future in which the thread is put to sleep by calling the wait() method. The only one who can wake up this future is the onResponce() method, which is called when a response is received from the server. This simple trick creates the impression of request-response communication.

Finally, we'll just add the implementation of the remaining methods, required by the interface:

@Override
public ConnectionState getConnectionState() {
    return connectionState.get();
}

@Override
public ReadOnlyObjectProperty<ConnectionState> connectionStateProperty() {
    return connectionState.getReadOnlyProperty();
}

@Override
public String getConnectedServerName() {
    return connectedServerName.get();
}

That would be all for this lesson.

Next time, in the lesson Java Chat - Client - Server Connection Part 3, we'll connect to the server.


 

Previous article
Java Chat - Client - Server Connection Part 1
All articles in this section
Server for Client Applications in Java
Skip article
(not recommended)
Java Chat - Client - Server Connection Part 3
Article has been written for you by Petr Štechmüller
Avatar
User rating:
No one has rated this quite yet, be the first one!
Activities