001package jmri.jmrix.sprog; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.Vector; 006import java.util.concurrent.BlockingQueue; 007import java.util.concurrent.LinkedBlockingQueue; 008 009import jmri.jmrix.AbstractPortController; 010import jmri.jmrix.sprog.SprogConstants.SprogState; 011import jmri.jmrix.sprog.serialdriver.SerialDriverAdapter; 012 013/** 014 * Converts Stream-based I/O to/from Sprog messages. The "SprogInterface" side 015 * sends/receives message objects. The connection to a SprogPortController is 016 * via a pair of *Streams, which then carry sequences of characters for 017 * transmission. Note that this processing is handled in an independent thread. 018 * <p> 019 * Rewritten during 4.11.x series. Create a high priority thread for the tc to 020 * move everything off the swing thread. Use a blocking queue to handle 021 * asynchronous messages from multiple sources. 022 * 023 * @author Bob Jacobsen Copyright (C) 2001 024 * @author Andrew Crosland Copyright (C) 2018 025 */ 026public class SprogTrafficController implements SprogInterface, 027 Runnable { 028 029 private SprogReply reply = new SprogReply(); 030 SprogListener lastSender = null; 031 private SprogState sprogState = SprogState.NORMAL; 032 private int lastId; 033 034 private Thread tcThread; 035 private final Object lock = new Object(); 036 private boolean replyAvailable = false; 037 // Make this public so it can be overridden by a script for debug 038 public int timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT; 039 040 /** 041 * Create a new SprogTrafficController instance. 042 * 043 * @param adaptermemo the associated SystemConnectionMemo 044 */ 045 @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value="SC_START_IN_CTOR", justification="done at end, waits for data") 046 public SprogTrafficController(SprogSystemConnectionMemo adaptermemo) { 047 memo = adaptermemo; 048 init(); 049 } 050 051 private void init() { 052 // Set the timeout for communication with hardware 053 resetTimeout(); 054 055 tcThread = jmri.util.ThreadingUtil.newThread(this); 056 tcThread.setName("SPROG TC thread"); 057 tcThread.setPriority(Thread.MAX_PRIORITY-1); 058 tcThread.setDaemon(true); 059 log.debug("starting TC thread from {} ", this, jmri.util.LoggingUtil.shortenStacktrace(new Exception("traceback"),6)); 060 tcThread.start(); 061 } 062 063 // Methods to implement the Sprog Interface 064 065 protected Vector<SprogListener> cmdListeners = new Vector<SprogListener>(); 066 067 @Override 068 public boolean status() { 069 return (ostream != null && istream != null); 070 } 071 072 /** 073 * Check if the Sprog TC Thread ( started on construction of 074 * SprogTrafficController ) is alive. 075 * For testing purposes. 076 * @return true if alive, else false. 077 */ 078 public boolean isTcThreadAlive() { 079 return tcThread.isAlive(); 080 } 081 082 @Override 083 public synchronized void addSprogListener(SprogListener l) { 084 // add only if not already registered 085 if (l == null) { 086 throw new java.lang.NullPointerException(); 087 } 088 if (!cmdListeners.contains(l)) { 089 cmdListeners.addElement(l); 090 log.debug("SprogListener added to {} tc", memo.getUserName()); 091 } 092 } 093 094 @Override 095 public synchronized void removeSprogListener(SprogListener l) { 096 if (cmdListeners.contains(l)) { 097 cmdListeners.removeElement(l); 098 } 099 } 100 101 /** 102 * Reset timeout to default depending on current mode 103 */ 104 public void resetTimeout() { 105 if (memo.getSprogMode() == SprogConstants.SprogMode.OPS) { 106 timeout = SprogConstants.TC_OPS_REPLY_TIMEOUT; 107 } else { 108 timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT; 109 } 110 } 111 112 public void setTimeout(int t) { 113 timeout = t; 114 } 115 116 public SprogState getSprogState() { 117 return sprogState; 118 } 119 120 public void setSprogState(SprogState s) { 121 this.sprogState = s; 122 if (s == SprogState.V4BOOTMODE) { 123 // enable flow control - required for sprog v4 bootloader 124 var controller = getController(); 125 controller.setHandshake(jmri.jmrix.AbstractSerialPortController.FlowControl.RTSCTS); 126 127 } else { 128 // disable flow control 129 // removed Jan 2010 - this stops SPROG from sending. Could cause problems with 130 // serial Sprogs, but I have no way of testing: 131 // getController().setHandshake(false); 132 } 133 log.debug("Setting sprogState {}", s); 134 } 135 136 public boolean isNormalMode() { 137 return sprogState == SprogState.NORMAL; 138 } 139 140 public boolean isSIIBootMode() { 141 return sprogState == SprogState.SIIBOOTMODE; 142 } 143 144 public boolean isV4BootMode() { 145 return sprogState == SprogState.V4BOOTMODE; 146 } 147 148 @SuppressWarnings("unchecked") 149 private synchronized Vector<SprogListener> getCopyOfListeners() { 150 return (Vector<SprogListener>) cmdListeners.clone(); 151 152 } 153 154 protected synchronized void notifyMessage(SprogMessage m, SprogListener originator) { 155 for (SprogListener listener : this.getCopyOfListeners()) { 156 try { 157 // don't send it back to the originator! 158 if (listener != originator) { 159 // skip forwarding to the last sender for now, we'll get them later 160 if (lastSender != listener) { 161 listener.notifyMessage(m); 162 } 163 } 164 } catch (Exception e) { 165 log.warn("notify: During dispatch to {}", listener, e); 166 } 167 } 168 // forward to the last listener who sent a message 169 // this is done _second_ so monitoring can have already stored the reply 170 // before a response is sent 171 if (lastSender != null && lastSender != originator) { 172 lastSender.notifyMessage(m); 173 } 174 } 175 176 protected synchronized void notifyReply(SprogReply r) { 177 log.debug("notifyReply starts for later, last sender: {}", lastSender); 178 179 final Vector<SprogListener> listeners = this.getCopyOfListeners(); 180 final SprogReply replyForLater = r; 181 final SprogListener senderForLater = lastSender; 182 183 // Notify non-sender listeners on the GUI thread (e.g. monitor frames) 184 Runnable rl = () -> { 185 for (SprogListener listener : listeners) { 186 try { 187 // don't send message back to the originator! 188 // skip forwarding to the last sender for now, we'll get them later 189 if (senderForLater != listener) { 190 listener.notifyReply(replyForLater); 191 } 192 193 } catch (Exception e) { 194 log.warn("notify: During dispatch to {}", listener, e); 195 } 196 } 197 }; 198 javax.swing.SwingUtilities.invokeLater(rl); 199 200 // Notify the sender synchronously on the current thread so that 201 // time-critical listeners (e.g. SprogCommandStation slot thread) 202 // are woken immediately, without waiting for the EDT to be free. 203 if (senderForLater != null) { 204 senderForLater.notifyReply(replyForLater); 205 } 206 } 207 208 protected synchronized void notifyReply(SprogReply r, SprogListener lastSender) { 209 log.debug("notifyReply starts for later, last sender: {}", lastSender); 210 211 final Vector<SprogListener> listeners = this.getCopyOfListeners(); 212 final SprogReply replyForLater = r; 213 final SprogListener senderForLater = lastSender; 214 215 // Notify non-sender listeners on the GUI thread (e.g. monitor frames) 216 Runnable rl = () -> { 217 log.debug("notifyReply starts last sender: {}", senderForLater); 218 for (SprogListener listener : listeners) { 219 try { 220 //if is message don't send it back to the originator! 221 // skip forwarding to the last sender for now, we'll get them later 222 if (senderForLater != listener) { 223 log.debug("Notify listener: {} {}", listener, r.toString()); 224 listener.notifyReply(replyForLater); 225 } 226 227 } catch (Exception e) { 228 log.warn("notify: During dispatch to {}", listener, e); 229 } 230 } 231 }; 232 javax.swing.SwingUtilities.invokeLater(rl); 233 234 // Notify the sender synchronously on the current thread so that 235 // time-critical listeners (e.g. SprogCommandStation slot thread) 236 // are woken immediately, without waiting for the EDT to be free. 237 if (senderForLater != null) { 238 log.debug("notify last sender: {} {}", senderForLater, replyForLater.toString()); 239 senderForLater.notifyReply(replyForLater); 240 } 241 } 242 243 // A class to remember the message and who sent it 244 private static class MessageTuple { 245 private final SprogMessage message; 246 private final SprogListener listener; 247 248 public MessageTuple(SprogMessage m, SprogListener l) { 249 message = m; 250 listener = l; 251 } 252 253 // Copy constructor 254 public MessageTuple(MessageTuple mt) { 255 message = mt.message; 256 listener = mt.listener; 257 } 258 } 259 260 // The queue to hold messages being sent 261 BlockingQueue<MessageTuple> sendQueue = new LinkedBlockingQueue<MessageTuple>(); 262 263 /** 264 * Enqueue a preformatted message to be sent to the actual interface 265 * 266 * @param m The message to be forwarded 267 */ 268 public void sendSprogMessage(SprogMessage m) { 269 log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId()); 270 try { 271 sendQueue.add(new MessageTuple(m, null)); 272 } catch (Exception e) { 273 log.error("Could not add message to queue", e); 274 } 275 } 276 277 /** 278 * Enqueue a preformatted message to be sent to the actual interface 279 * 280 * @param m Message to send 281 * @param replyTo Who is sending the message 282 */ 283 @Override 284 public synchronized void sendSprogMessage(SprogMessage m, SprogListener replyTo) { 285 log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId()); 286 try { 287 sendQueue.add(new MessageTuple(m, replyTo)); 288 } catch (Exception e) { 289 log.error("Could not add message to queue", e); 290 } 291 } 292 293 /** 294 * Block until a message is available from the queue, send it to the interface 295 * and then block until reply is received or a timeout occurs. This will be 296 * a very long timeout to allow for page mode programming operations in SPROG 297 * programmer mode. 298 */ 299 @Override 300 public void run() { 301 MessageTuple messageToSend; 302 log.debug("Traffic controller queuing thread starts"); 303 while (true) { 304 log.debug("Traffic controller queue waiting"); 305 try { 306 messageToSend = new MessageTuple(sendQueue.take()); 307 } catch (InterruptedException e) { 308 log.debug("Thread interrupted while dequeuing message to send"); 309 return; 310 } 311 log.debug("Message dequeued {} id: {}", messageToSend.message, messageToSend.message.getId()); 312 // remember who sent this 313 lastSender = messageToSend.listener; 314 lastId = messageToSend.message.getId(); 315 // notify all _other_ listeners 316 notifyMessage(messageToSend.message, messageToSend.listener); 317 replyAvailable = false; 318 sendToInterface(messageToSend.message); 319 log.debug("Waiting {} for a reply", timeout); 320 try { 321 synchronized (lock) { 322 lock.wait(timeout); // Wait for notify 323 } 324 } catch (InterruptedException e) { 325 log.debug("waitingForReply interrupted"); 326 } 327 if (!replyAvailable) { 328 // Timed out 329 log.warn("Timeout waiting for reply from hardware in SprogState {}", sprogState); 330 } else { 331 log.debug("Notified of reply"); 332 } 333 } 334 } 335 336 /** 337 * Forward a preformatted message to the interface. 338 * 339 * @param m The message to be forwarded 340 */ 341 public void sendToInterface(SprogMessage m) { 342 // stream to port in single write, as that's needed by serial 343 try { 344 if (ostream != null) { 345 ostream.write(m.getFormattedMessage(sprogState)); 346 log.debug("sendSprogMessage written to ostream"); 347 } else { 348 // no stream connected 349 log.warn("sendMessage: no connection established"); 350 } 351 } catch (Exception e) { 352 log.warn("sendMessage: Exception: ", e); 353 } 354 } 355 356// methods to connect/disconnect to a source of data in a SprogPortController 357 private AbstractPortController controller = null; 358 359 /** 360 * Make connection to existing PortController object. 361 * 362 * @param p The port controller 363 */ 364 public void connectPort(AbstractPortController p) { 365 istream = p.getInputStream(); 366 ostream = p.getOutputStream(); 367 if (controller != null) { 368 log.warn("connectPort: connect called while connected"); 369 } 370 controller = p; 371 } 372 373 /** 374 * Get the port controller, as a SerialDriverAdapter. 375 * 376 * @return the port controller 377 */ 378 protected SerialDriverAdapter getController(){ 379 return (SerialDriverAdapter) controller; 380 } 381 382 /** 383 * Break connection to existing SprogPortController object. 384 * <p> 385 * Once broken, attempts to send via "message" member will fail. 386 * 387 * @param p the connection to break 388 */ 389 public void disconnectPort(AbstractPortController p) { 390 istream = null; 391 ostream = null; 392 if (controller != p) { 393 log.warn("disconnectPort: disconnect called from non-connected SprogPortController"); 394 } 395 controller = null; 396 } 397 398 static volatile protected SprogTrafficController self = null; 399 400 public void setAdapterMemo(SprogSystemConnectionMemo adaptermemo) { 401 memo = adaptermemo; 402 } 403 404 public SprogSystemConnectionMemo getAdapterMemo() { 405 return memo; 406 } 407 408 private SprogSystemConnectionMemo memo = null; 409 410 // data members to hold the streams 411 DataInputStream istream = null; 412 OutputStream ostream = null; 413 414 boolean endReply(SprogReply msg) { 415 return msg.endNormalReply() || msg.endBootReply(); 416 } 417 418 private boolean unsolicited; 419 420 /** 421 * Handle an incoming reply. 422 */ 423 public void handleOneIncomingReply() { 424 // we get here if data has been received and this method is explicitly invoked 425 // fill the current reply with any data received 426 int replyCurrentSize = reply.getNumDataElements(); 427 int i; 428 for (i = replyCurrentSize; i < SprogReply.maxSize - replyCurrentSize; i++) { 429 try { 430 if (istream.available() == 0) { 431 break; // nothing waiting to be read 432 } 433 byte char1 = istream.readByte(); 434 reply.setElement(i, char1); 435 436 } catch (Exception e) { 437 log.warn("Exception in DATA_AVAILABLE state", e); 438 } 439 if (endReply(reply)) { 440 sendreply(); 441 break; 442 } 443 } 444 } 445 446 /** 447 * Send the current reply - built using data from serialEvent. 448 */ 449 private void sendreply() { 450 //send the reply 451 log.debug("dispatch reply of length {} in SprogState {}", reply.getNumDataElements(), sprogState); 452 if (unsolicited) { 453 log.debug("Unsolicited Reply"); 454 reply.setUnsolicited(); 455 } 456 // Insert the id 457 reply.setId(lastId); 458 notifyReply(reply, lastSender); 459 log.debug("Notify() wait"); 460 replyAvailable = true; 461 synchronized(lock) { 462 lock.notifyAll(); 463 } 464 465 //Create a new reply, ready to be filled 466 reply = new SprogReply(); 467 } 468 469 public void dispose(){ 470 tcThread.interrupt(); 471 try { 472 tcThread.join(); 473 } catch (InterruptedException e) { 474 // Do nothing 475 } 476 } 477 478 private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SprogTrafficController.class); 479 480}