001package jmri.jmrix.roco.z21;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.net.DatagramPacket;
005import java.util.ArrayList;
006import java.util.Arrays;
007import java.util.List;
008
009import jmri.jmrix.*;
010import org.slf4j.Logger;
011import org.slf4j.LoggerFactory;
012
013/**
014 * Abstract base for TrafficControllers in a Message/Reply protocol.
015 *
016 * @author Paul Bender Copyright (C) 2014
017 */
018public class Z21TrafficController extends jmri.jmrix.AbstractMRTrafficController implements Z21Interface {
019
020    private java.net.InetAddress host;
021    private int port;
022
023    public Z21TrafficController() {
024        super();
025        allowUnexpectedReply = true;
026    }
027
028    /**
029     * Implement this to forward a specific message type to a protocol-specific
030     * listener interface. This puts the casting into the concrete class.
031     */
032    @Override
033    protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) {
034        ((Z21Listener) client).message((Z21Message) m);
035    }
036
037    /**
038     * Implement this to forward a specific Reply type to a protocol-specific
039     * listener interface. This puts the casting into the concrete class.
040     */
041    @Override
042    protected void forwardReply(AbstractMRListener client, AbstractMRReply m) {
043        ((Z21Listener) client).reply((Z21Reply) m);
044    }
045
046    /**
047     * Invoked if it's appropriate to do low-priority polling of the command
048     * station, this should return the next message to send, or null if the TC
049     * should just sleep.
050     */
051    @Override
052    protected Z21Message pollMessage() {
053        return null;
054    }
055
056    @Override
057    protected Z21Listener pollReplyHandler() {
058        return null;
059    }
060
061    /**
062     * enterProgMode() and enterNormalMode() return any message that
063     * needs to be returned to the command station to change modes.
064     *
065     * @see #enterNormalMode()
066     * @return if no message is needed, you may return null.
067     *
068     * If the programmerIdle() function returns true, enterNormalMode() is
069     * called after a timeout while in IDLESTATE during programming to
070     * return the system to normal mode.
071     */
072    @Override
073    protected Z21Message enterProgMode() {
074        return null;
075    }
076
077    /**
078     * enterProgMode() and enterNormalMode() return any message that
079     * needs to be returned to the command station to change modes.
080     *
081     * @see #enterProgMode()
082     * @return if no message is needed, you may return null.
083     */
084    @Override
085    protected Z21Message enterNormalMode() {
086        return null;
087    }
088
089    /**
090     * Actually transmits the next message to the port.
091     */
092    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT", "", "UW_UNCOND_WAIT"},
093            justification = "Two locks needed for synchronization here, this is OK; String + only used for debug, so inefficient String processing not really a problem; Unconditional Wait is to give external hardware, which doesn't necessarilly respond, time to process the data.")
094    @Override
095    synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
096        if (log.isDebugEnabled()) {
097            log.debug("forwardToPort message: [{}]", m);
098        }
099        // remember who sent this
100        mLastSender = reply;
101
102        // forward the message to the registered recipients,
103        // which includes the communications monitor, except the sender.
104        // Schedule notification via the Swing event queue to ensure order
105        Runnable r = new XmtNotifier(m, mLastSender, this);
106        javax.swing.SwingUtilities.invokeLater(r);
107
108        // stream to port in single write, as that's needed by serial
109        byte[] msg = new byte[lengthOfByteStream(m)];
110        // add header
111        int offset = addHeaderToOutput(msg, m);
112
113        // add data content
114        int len = m.getNumDataElements();
115        for (int i = 0; i < len; i++) {
116            msg[i + offset] = (byte) m.getElement(i);
117        }
118        // add trailer
119        addTrailerToOutput(msg, len + offset, m);
120        // and send the bytes
121        try {
122            if (log.isDebugEnabled()) {
123                StringBuilder f = new StringBuilder("formatted message: ");
124                for (byte b : msg) {
125                    f.append(Integer.toHexString(0xFF & b));
126                    f.append(" ");
127                }
128                log.debug(new String(f));
129            }
130            while (m.getRetries() >= 0) {
131                if (portReadyToSend(controller)) {
132                    // create a datagram with the data from the
133                    // message.
134                    byte[] data = ((Z21Message) m).getBuffer();
135                    DatagramPacket sendPacket
136                            = new DatagramPacket(data, ((Z21Message) m).getLength(), host, port);
137                    // and send it.
138                    ((Z21Adapter) controller).getSocket().send(sendPacket);
139                    log.debug("written, msg timeout: {} mSec", m.getTimeout());
140                    break;
141                } else if (m.getRetries() >= 0) {
142                    if (log.isDebugEnabled()) {
143                        StringBuilder b = new StringBuilder("Retry message: ");
144                        b.append(m.toString());
145                        b.append(" attempts remaining: ");
146                        b.append(m.getRetries());
147                        log.debug(new String(b));
148                    }
149                    m.setRetries(m.getRetries() - 1);
150                    try {
151                        synchronized (xmtRunnable) {
152                            xmtRunnable.wait(m.getTimeout());
153                        }
154                    } catch (InterruptedException e) {
155                        Thread.currentThread().interrupt(); // retain if needed later
156                        if(!threadStopRequest) {
157                           log.error("retry wait interrupted");
158                        } else {
159                           log.error("retry wait interrupted during thread stop");
160                        }
161                    }
162                } else {
163                    log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg));
164                }
165            }
166        } catch (Exception e) {
167            // TODO Currently there's no port recovery if an exception occurs
168            // must restart JMRI to clear xmtException.
169            xmtException = true;
170            portWarn(e);
171        }
172    }
173
174    @Override()
175    public boolean status() {
176        if (controller == null) {
177            return false;
178        } else {
179            return (controller.status());
180        }
181    }
182
183    /**
184     * Make connection to existing PortController object.
185     */
186    @Override
187    public void connectPort(AbstractPortController p) {
188        rcvException = false;
189        xmtException = false;
190        if (controller != null) {
191            log.warn("connectPort: connect called while connected");
192        } else {
193            log.debug("connectPort invoked");
194        }
195        if (!(p instanceof Z21Adapter)) {
196            throw new IllegalArgumentException("attempt to connect wrong port type");
197        }
198        controller = p;
199        try {
200            host = java.net.InetAddress.getByName(((Z21Adapter) controller).getHostName());
201            port = ((Z21Adapter) controller).getPort();
202            ConnectionStatus.instance().setConnectionState(
203                    p.getSystemConnectionMemo(), ConnectionStatus.CONNECTION_UP);
204        } catch (java.net.UnknownHostException uhe) {
205            log.error("Unknown Host: {} ", ((Z21Adapter) controller).getHostName());
206            ConnectionStatus.instance().setConnectionState(
207                    p.getSystemConnectionMemo(), ConnectionStatus.CONNECTION_DOWN);
208        }
209        // and start threads
210        xmtThread = new Thread(xmtRunnable = () -> {
211            try {
212                transmitLoop();
213            } catch (Throwable e) {
214                if(!threadStopRequest)
215                    log.error("Transmit thread terminated prematurely by: {}", e.toString(), e);
216            }
217        });
218        xmtThread.setName("z21.Z21TrafficController Transmit thread");
219        xmtThread.start();
220        rcvThread = new Thread(this::receiveLoop);
221        rcvThread.setName("z21.Z21TrafficController Receive thread");
222        int xr = rcvThread.getPriority();
223        xr++;
224        rcvThread.setPriority(xr);      //bump up the priority
225        rcvThread.start();
226    }
227
228    /**
229     * Break connection to existing PortController object. Once broken, attempts
230     * to send via "message" member will fail.
231     */
232    @Override
233    public void disconnectPort(AbstractPortController p) {
234        if (controller != p) {
235            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
236        }
237        controller = null;
238    }
239
240    @Override
241    protected Z21Reply newReply() {
242        return new Z21Reply();
243    }
244
245    @Override
246    protected boolean endOfMessage(AbstractMRReply r) {
247        // since this is a UDP protocol, and each reply in the packet is complete,
248        // we don't check for end of message manually.
249        return true;
250    }
251
252    /**
253     * Handle each reply when complete.
254     * <p>
255     * (This is public for testing purposes) Runs in the "Receive" thread.
256     */
257    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP", "NO_NOTIFY_NOT_NOTIFYALL"},
258            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.  Notify is used because Having more than one thread waiting on xmtRunnable is an error.")
259    @Override
260    public void handleOneIncomingReply() throws java.io.IOException {
261        // we sit in this until the message is complete, relying on
262        // threading to let other stuff happen
263
264        // create a buffer to hold the incoming data.
265        byte[] buffer = new byte[100];  // the size here just needs to be longer
266        // than the longest protocol message.
267        // Otherwise, the receive will truncate.
268
269        // create the packet.
270        DatagramPacket receivePacket = new DatagramPacket(buffer, 100, host, port);
271
272        // and wait to receive data in the packet.
273        try {
274            ((Z21Adapter) controller).getSocket().receive(receivePacket);
275        } catch (java.net.SocketException | NullPointerException se) {
276            // if we are waiting when the controller is disposed,
277            // a socket exception will be thrown.
278            log.debug("Socket exception during receive.  Connection Closed?");
279            rcvException = true;
280            return;
281        }
282        if (threadStopRequest) return;
283
284        // handle more than one reply in the same UDP packet.
285        List<Z21Reply> replies = new ArrayList<>();
286
287        int totalLength=receivePacket.getLength();
288        int consumed=0;
289
290        do {
291            int length = (0xff & buffer[0]) + ((0xff & buffer[1]) << 8);
292            Z21Reply msg = new Z21Reply(buffer, length);
293
294            replies.add(msg);
295
296            buffer = Arrays.copyOfRange(buffer,length,buffer.length);
297            consumed +=length;
298            log.trace("total length: {} consumed {}",totalLength,consumed);
299        } while(totalLength>consumed);
300
301
302        // and then dispatch each reply
303        replies.forEach(this::dispatchReply);
304    }
305
306    private void dispatchReply(Z21Reply msg) {
307        // message is complete, dispatch it !!
308        replyInDispatch = true;
309        if (log.isDebugEnabled()) {
310            log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState);
311        }
312
313        // forward the message to the registered recipients,
314        // which includes the communications monitor
315        // return a notification via the Swing event queue to ensure proper thread
316        Runnable r = new RcvNotifier(msg, mLastSender, this);
317        try {
318            javax.swing.SwingUtilities.invokeAndWait(r);
319        } catch (InterruptedException ie) {
320            if(threadStopRequest) return;
321            log.error("Unexpected exception in invokeAndWait:{}", ie, ie);
322        } catch (Exception e) {
323            log.error("Unexpected exception in invokeAndWait:{}", e, e);
324        }
325        if (log.isDebugEnabled()) {
326            log.debug("dispatch thread invoked");
327        }
328
329        if (!msg.isUnsolicited()) {
330            // effect on transmit:
331            switch (mCurrentState) {
332                case WAITMSGREPLYSTATE: {
333                    // check to see if the response was an error message we want
334                    // to automatically handle by re-queueing the last sent
335                    // message, otherwise go on to the next message
336                    if (msg.isRetransmittableErrorMsg()) {
337                        if (log.isDebugEnabled()) {
338                            log.debug("Automatic Recovery from Error Message: +msg.toString()");
339                        }
340                        synchronized (xmtRunnable) {
341                            mCurrentState = AUTORETRYSTATE;
342                            replyInDispatch = false;
343                            xmtRunnable.notify();
344                        }
345                    } else {
346                        // update state, and notify to continue
347                        synchronized (xmtRunnable) {
348                            mCurrentState = NOTIFIEDSTATE;
349                            replyInDispatch = false;
350                            xmtRunnable.notify();
351                        }
352                    }
353                    break;
354                }
355                case WAITREPLYINPROGMODESTATE: {
356                    // entering programming mode
357                    mCurrentMode = PROGRAMINGMODE;
358                    replyInDispatch = false;
359
360                    // check to see if we need to delay to allow decoders to become
361                    // responsive
362                    int warmUpDelay = enterProgModeDelayTime();
363                    if (warmUpDelay != 0) {
364                        try {
365                            synchronized (xmtRunnable) {
366                                xmtRunnable.wait(warmUpDelay);
367                            }
368                        } catch (InterruptedException e) {
369                            Thread.currentThread().interrupt(); // retain if needed later
370                            if (threadStopRequest) return;
371                        }
372                    }
373                    // update state, and notify to continue
374                    synchronized (xmtRunnable) {
375                        mCurrentState = OKSENDMSGSTATE;
376                        xmtRunnable.notify();
377                    }
378                    break;
379                }
380                case WAITREPLYINNORMMODESTATE: {
381                    // entering normal mode
382                    mCurrentMode = NORMALMODE;
383                    replyInDispatch = false;
384                    // update state, and notify to continue
385                    synchronized (xmtRunnable) {
386                        mCurrentState = OKSENDMSGSTATE;
387                        xmtRunnable.notify();
388                    }
389                    break;
390                }
391                default: {
392                    replyInDispatch = false;
393                    if (allowUnexpectedReply) {
394                        if (log.isDebugEnabled()) {
395                            log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg.toString());
396                        }
397                        synchronized (xmtRunnable) {
398                            // The transmit thread sometimes gets stuck
399                            // when unexpected replies are received.  Notify
400                            // it to clear the block without a timeout.
401                            // (do not change the current state)
402                            //if(mCurrentState!=IDLESTATE)
403                            xmtRunnable.notify();
404                        }
405                    } else {
406                        unexpectedReplyStateError(mCurrentState,msg.toString());
407                    }
408                }
409            }
410            // Unsolicited message
411        } else {
412            if (log.isDebugEnabled()) {
413                log.debug("Unsolicited Message Received {}", msg.toString());
414            }
415
416            replyInDispatch = false;
417        }
418    }
419
420    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"},
421            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.")
422    @Override
423    protected void terminate() {
424        if (controller == null) {
425            log.debug("terminate called while not connected");
426            return;
427        } else {
428            log.debug("Cleanup Starts");
429        }
430
431        Z21Message logoffMessage = Z21Message.getLanLogoffRequestMessage();
432        forwardToPort(logoffMessage, null);
433        // wait for reply
434        try {
435            if (xmtRunnable != null) {
436                synchronized (xmtRunnable) {
437                    xmtRunnable.wait(logoffMessage.getTimeout());
438                }
439            }
440        } catch (InterruptedException e) {
441            Thread.currentThread().interrupt(); // retain if needed later
442            log.error("transmit interrupted");
443        } finally {
444            // set the controller to null, even if terminate fails.
445            controller = null;
446        }
447    }
448
449    /**
450     * Terminate the receive and transmit threads.
451     * <p>
452     * This is intended to be used only by testing subclasses.
453     */
454    @Override
455    public void terminateThreads() {
456        threadStopRequest = true;
457        // ensure socket closed to end pending operations
458        if ( controller != null && ((Z21Adapter) controller).getSocket() != null) ((Z21Adapter) controller).getSocket().close();
459
460        // usual stop process
461        super.terminateThreads();
462    }
463
464    // The methods to implement the Z21Interface
465    @Override
466    public synchronized void addz21Listener(Z21Listener l) {
467        this.addListener(l);
468    }
469
470    @Override
471    public synchronized void removez21Listener(Z21Listener l) {
472        this.removeListener(l);
473    }
474
475    /**
476     * Forward a preformatted message to the actual interface.
477     */
478    @Override
479    public void sendz21Message(Z21Message m, Z21Listener reply) {
480        sendMessage(m, reply);
481    }
482
483    private static final Logger log = LoggerFactory.getLogger(Z21TrafficController.class);
484}