001package jmri.jmrix.sprog;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.Vector;
006import java.util.concurrent.BlockingQueue;
007import java.util.concurrent.LinkedBlockingQueue;
008
009import jmri.jmrix.AbstractPortController;
010import jmri.jmrix.sprog.SprogConstants.SprogState;
011import jmri.jmrix.sprog.serialdriver.SerialDriverAdapter;
012
013/**
014 * Converts Stream-based I/O to/from Sprog messages. The "SprogInterface" side
015 * sends/receives message objects. The connection to a SprogPortController is
016 * via a pair of *Streams, which then carry sequences of characters for
017 * transmission. Note that this processing is handled in an independent thread.
018 * <p>
019 * Rewritten during 4.11.x series. Create a high priority thread for the tc to
020 * move everything off the swing thread. Use a blocking queue to handle
021 * asynchronous messages from multiple sources.
022 *
023 * @author Bob Jacobsen Copyright (C) 2001
024 * @author Andrew Crosland Copyright (C) 2018
025 */
026public class SprogTrafficController implements SprogInterface,
027        Runnable {
028
029    private SprogReply reply = new SprogReply();
030    SprogListener lastSender = null;
031    private SprogState sprogState = SprogState.NORMAL;
032    private int lastId;
033
034    private Thread tcThread;
035    private final Object lock = new Object();
036    private boolean replyAvailable = false;
037    // Make this public so it can be overridden by a script for debug
038    public int timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT;
039
040    /**
041     * Create a new SprogTrafficController instance.
042     *
043     * @param adaptermemo the associated SystemConnectionMemo
044     */
045    @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value="SC_START_IN_CTOR", justification="done at end, waits for data")
046    public SprogTrafficController(SprogSystemConnectionMemo adaptermemo) {
047        memo = adaptermemo;
048        init();
049    }
050
051    private void init() {
052        // Set the timeout for communication with hardware
053        resetTimeout();
054
055        tcThread = jmri.util.ThreadingUtil.newThread(this);
056        tcThread.setName("SPROG TC thread");
057        tcThread.setPriority(Thread.MAX_PRIORITY-1);
058        tcThread.setDaemon(true);
059        log.debug("starting TC thread from {} ", this, jmri.util.LoggingUtil.shortenStacktrace(new Exception("traceback"),6));
060        tcThread.start();
061    }
062
063    // Methods to implement the Sprog Interface
064
065    protected Vector<SprogListener> cmdListeners = new Vector<SprogListener>();
066
067    @Override
068    public boolean status() {
069        return (ostream != null && istream != null);
070    }
071
072    /**
073     * Check if the Sprog TC Thread ( started on construction of
074     * SprogTrafficController ) is alive.
075     * For testing purposes.
076     * @return true if alive, else false.
077     */
078    public boolean isTcThreadAlive() {
079        return tcThread.isAlive();
080    }
081
082    @Override
083    public synchronized void addSprogListener(SprogListener l) {
084        // add only if not already registered
085        if (l == null) {
086            throw new java.lang.NullPointerException();
087        }
088        if (!cmdListeners.contains(l)) {
089            cmdListeners.addElement(l);
090            log.debug("SprogListener added to {} tc", memo.getUserName());
091        }
092    }
093
094    @Override
095    public synchronized void removeSprogListener(SprogListener l) {
096        if (cmdListeners.contains(l)) {
097            cmdListeners.removeElement(l);
098        }
099    }
100
101    /**
102     * Reset timeout to default depending on current mode
103     */
104    public void resetTimeout() {
105        if (memo.getSprogMode() == SprogConstants.SprogMode.OPS) {
106            timeout = SprogConstants.TC_OPS_REPLY_TIMEOUT;
107        } else {
108            timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT;
109        }
110    }
111
112    public void setTimeout(int t) {
113        timeout = t;
114    }
115
116    public SprogState getSprogState() {
117        return sprogState;
118    }
119
120    public void setSprogState(SprogState s) {
121        this.sprogState = s;
122        if (s == SprogState.V4BOOTMODE) {
123            // enable flow control - required for sprog v4 bootloader
124            var controller = getController();
125            controller.setHandshake(jmri.jmrix.AbstractSerialPortController.FlowControl.RTSCTS);
126
127        } else {
128            // disable flow control
129            // removed Jan 2010 - this stops SPROG from sending. Could cause problems with
130            // serial Sprogs, but I have no way of testing:
131            // getController().setHandshake(false);
132        }
133        log.debug("Setting sprogState {}", s);
134    }
135
136    public boolean isNormalMode() {
137        return sprogState == SprogState.NORMAL;
138    }
139
140    public boolean isSIIBootMode() {
141        return sprogState == SprogState.SIIBOOTMODE;
142    }
143
144    public boolean isV4BootMode() {
145        return sprogState == SprogState.V4BOOTMODE;
146    }
147
148    @SuppressWarnings("unchecked")
149    private synchronized Vector<SprogListener> getCopyOfListeners() {
150        return (Vector<SprogListener>) cmdListeners.clone();
151
152    }
153
154    protected synchronized void notifyMessage(SprogMessage m, SprogListener originator) {
155        for (SprogListener listener : this.getCopyOfListeners()) {
156            try {
157                // don't send it back to the originator!
158                if (listener != originator) {
159                    // skip forwarding to the last sender for now, we'll get them later
160                    if (lastSender != listener) {
161                        listener.notifyMessage(m);
162                    }
163                }
164            } catch (Exception e) {
165                log.warn("notify: During dispatch to {}", listener, e);
166            }
167        }
168        // forward to the last listener who sent a message
169        // this is done _second_ so monitoring can have already stored the reply
170        // before a response is sent
171        if (lastSender != null && lastSender != originator) {
172            lastSender.notifyMessage(m);
173        }
174    }
175
176    protected synchronized void notifyReply(SprogReply r) {
177        log.debug("notifyReply starts for later, last sender: {}", lastSender);
178
179        final Vector<SprogListener> listeners = this.getCopyOfListeners();
180        final SprogReply replyForLater = r;
181        final SprogListener senderForLater = lastSender;
182
183        // Notify non-sender listeners on the GUI thread (e.g. monitor frames)
184        Runnable rl = () -> {
185            for (SprogListener listener : listeners) {
186                try {
187                    // don't send message back to the originator!
188                    // skip forwarding to the last sender for now, we'll get them later
189                    if (senderForLater != listener) {
190                        listener.notifyReply(replyForLater);
191                    }
192
193                } catch (Exception e) {
194                    log.warn("notify: During dispatch to {}", listener, e);
195                }
196            }
197        };
198        javax.swing.SwingUtilities.invokeLater(rl);
199
200        // Notify the sender synchronously on the current thread so that
201        // time-critical listeners (e.g. SprogCommandStation slot thread)
202        // are woken immediately, without waiting for the EDT to be free.
203        if (senderForLater != null) {
204            senderForLater.notifyReply(replyForLater);
205        }
206    }
207
208    protected synchronized void notifyReply(SprogReply r, SprogListener lastSender) {
209        log.debug("notifyReply starts for later, last sender: {}", lastSender);
210
211        final Vector<SprogListener> listeners = this.getCopyOfListeners();
212        final SprogReply replyForLater = r;
213        final SprogListener senderForLater = lastSender;
214
215        // Notify non-sender listeners on the GUI thread (e.g. monitor frames)
216        Runnable rl = () -> {
217            log.debug("notifyReply starts last sender: {}", senderForLater);
218            for (SprogListener listener : listeners) {
219                try {
220                //if is message don't send it back to the originator!
221                    // skip forwarding to the last sender for now, we'll get them later
222                    if (senderForLater != listener) {
223                        log.debug("Notify listener: {} {}", listener, r.toString());
224                        listener.notifyReply(replyForLater);
225                    }
226
227                } catch (Exception e) {
228                    log.warn("notify: During dispatch to {}", listener, e);
229                }
230            }
231        };
232        javax.swing.SwingUtilities.invokeLater(rl);
233
234        // Notify the sender synchronously on the current thread so that
235        // time-critical listeners (e.g. SprogCommandStation slot thread)
236        // are woken immediately, without waiting for the EDT to be free.
237        if (senderForLater != null) {
238            log.debug("notify last sender: {} {}", senderForLater, replyForLater.toString());
239            senderForLater.notifyReply(replyForLater);
240        }
241    }
242
243    // A class to remember the message and who sent it
244    private static class MessageTuple {
245        private final SprogMessage message;
246        private final SprogListener listener;
247
248        public MessageTuple(SprogMessage m, SprogListener l) {
249            message = m;
250            listener = l;
251        }
252
253        // Copy constructor
254        public MessageTuple(MessageTuple mt) {
255            message = mt.message;
256            listener = mt.listener;
257        }
258    }
259
260    // The queue to hold messages being sent
261    BlockingQueue<MessageTuple> sendQueue = new LinkedBlockingQueue<MessageTuple>();
262
263    /**
264     * Enqueue a preformatted message to be sent to the actual interface
265     *
266     * @param m The message to be forwarded
267     */
268    public void sendSprogMessage(SprogMessage m) {
269        log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId());
270        try {
271            sendQueue.add(new MessageTuple(m, null));
272        } catch (Exception e) {
273            log.error("Could not add message to queue", e);
274        }
275    }
276
277    /**
278     * Enqueue a preformatted message to be sent to the actual interface
279     *
280     * @param m         Message to send
281     * @param replyTo   Who is sending the message
282     */
283    @Override
284    public synchronized void sendSprogMessage(SprogMessage m, SprogListener replyTo) {
285        log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId());
286        try {
287            sendQueue.add(new MessageTuple(m, replyTo));
288        } catch (Exception e) {
289            log.error("Could not add message to queue", e);
290        }
291    }
292
293    /**
294     * Block until a message is available from the queue, send it to the interface
295     * and then block until reply is received or a timeout occurs. This will be
296     * a very long timeout to allow for page mode programming operations in SPROG
297     * programmer mode.
298     */
299    @Override
300    public void run() {
301        MessageTuple messageToSend;
302        log.debug("Traffic controller queuing thread starts");
303        while (true) {
304            log.debug("Traffic controller queue waiting");
305            try {
306                messageToSend = new MessageTuple(sendQueue.take());
307            } catch (InterruptedException e) {
308                log.debug("Thread interrupted while dequeuing message to send");
309                return;
310            }
311            log.debug("Message dequeued {} id: {}", messageToSend.message, messageToSend.message.getId());
312            // remember who sent this
313            lastSender = messageToSend.listener;
314            lastId = messageToSend.message.getId();
315            // notify all _other_ listeners
316            notifyMessage(messageToSend.message, messageToSend.listener);
317            replyAvailable = false;
318            sendToInterface(messageToSend.message);
319            log.debug("Waiting {} for a reply", timeout);
320            try {
321                synchronized (lock) {
322                    lock.wait(timeout); // Wait for notify
323                }
324            } catch (InterruptedException e) {
325                log.debug("waitingForReply interrupted");
326            }
327            if (!replyAvailable) {
328                // Timed out
329                log.warn("Timeout waiting for reply from hardware in SprogState {}", sprogState);
330            } else {
331                log.debug("Notified of reply");
332            }
333        }
334    }
335
336    /**
337     * Forward a preformatted message to the interface.
338     *
339     * @param m The message to be forwarded
340     */
341    public void sendToInterface(SprogMessage m) {
342        // stream to port in single write, as that's needed by serial
343        try {
344            if (ostream != null) {
345                ostream.write(m.getFormattedMessage(sprogState));
346                log.debug("sendSprogMessage written to ostream");
347            } else {
348                // no stream connected
349                log.warn("sendMessage: no connection established");
350            }
351        } catch (Exception e) {
352            log.warn("sendMessage: Exception: ", e);
353        }
354    }
355
356// methods to connect/disconnect to a source of data in a SprogPortController
357    private AbstractPortController controller = null;
358
359    /**
360     * Make connection to existing PortController object.
361     *
362     * @param p The port controller
363     */
364    public void connectPort(AbstractPortController p) {
365        istream = p.getInputStream();
366        ostream = p.getOutputStream();
367        if (controller != null) {
368            log.warn("connectPort: connect called while connected");
369        }
370        controller = p;
371    }
372
373    /**
374     * Get the port controller, as a SerialDriverAdapter.
375     *
376     * @return the port controller
377     */
378    protected SerialDriverAdapter getController(){
379       return (SerialDriverAdapter) controller;
380    }
381
382    /**
383     * Break connection to existing SprogPortController object.
384     * <p>
385     * Once broken, attempts to send via "message" member will fail.
386     *
387     * @param p the connection to break
388     */
389    public void disconnectPort(AbstractPortController p) {
390        istream = null;
391        ostream = null;
392        if (controller != p) {
393            log.warn("disconnectPort: disconnect called from non-connected SprogPortController");
394        }
395        controller = null;
396    }
397
398    static volatile protected SprogTrafficController self = null;
399
400    public void setAdapterMemo(SprogSystemConnectionMemo adaptermemo) {
401        memo = adaptermemo;
402    }
403
404    public SprogSystemConnectionMemo getAdapterMemo() {
405        return memo;
406    }
407
408    private SprogSystemConnectionMemo memo = null;
409
410    // data members to hold the streams
411    DataInputStream istream = null;
412    OutputStream ostream = null;
413
414    boolean endReply(SprogReply msg) {
415        return msg.endNormalReply() || msg.endBootReply();
416    }
417
418    private boolean unsolicited;
419
420    /**
421     * Handle an incoming reply.
422     */
423    public void handleOneIncomingReply() {
424        // we get here if data has been received and this method is explicitly invoked
425        // fill the current reply with any data received
426        int replyCurrentSize = reply.getNumDataElements();
427        int i;
428        for (i = replyCurrentSize; i < SprogReply.maxSize - replyCurrentSize; i++) {
429            try {
430                if (istream.available() == 0) {
431                    break; // nothing waiting to be read
432                }
433                byte char1 = istream.readByte();
434                reply.setElement(i, char1);
435
436            } catch (Exception e) {
437                log.warn("Exception in DATA_AVAILABLE state", e);
438            }
439            if (endReply(reply)) {
440                sendreply();
441                break;
442            }
443        }
444    }
445
446    /**
447     * Send the current reply - built using data from serialEvent.
448     */
449    private void sendreply() {
450        //send the reply
451        log.debug("dispatch reply of length {} in SprogState {}", reply.getNumDataElements(), sprogState);
452        if (unsolicited) {
453            log.debug("Unsolicited Reply");
454            reply.setUnsolicited();
455        }
456        // Insert the id
457        reply.setId(lastId);
458        notifyReply(reply, lastSender);
459        log.debug("Notify() wait");
460        replyAvailable = true;
461        synchronized(lock) {
462            lock.notifyAll();
463        }
464
465        //Create a new reply, ready to be filled
466        reply = new SprogReply();
467    }
468
469    public void dispose(){
470        tcThread.interrupt();
471        try {
472            tcThread.join();
473        } catch (InterruptedException e) {
474            // Do nothing
475        }
476    }
477
478    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SprogTrafficController.class);
479
480}