001package jmri.jmrix.roco.z21; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.net.DatagramPacket; 005import java.util.ArrayList; 006import java.util.Arrays; 007import java.util.List; 008 009import jmri.jmrix.*; 010import org.slf4j.Logger; 011import org.slf4j.LoggerFactory; 012 013/** 014 * Abstract base for TrafficControllers in a Message/Reply protocol. 015 * 016 * @author Paul Bender Copyright (C) 2014 017 */ 018public class Z21TrafficController extends jmri.jmrix.AbstractMRTrafficController implements Z21Interface { 019 020 private java.net.InetAddress host; 021 private int port; 022 023 public Z21TrafficController() { 024 super(); 025 allowUnexpectedReply = true; 026 } 027 028 /** 029 * Implement this to forward a specific message type to a protocol-specific 030 * listener interface. This puts the casting into the concrete class. 031 */ 032 @Override 033 protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) { 034 ((Z21Listener) client).message((Z21Message) m); 035 } 036 037 /** 038 * Implement this to forward a specific Reply type to a protocol-specific 039 * listener interface. This puts the casting into the concrete class. 040 */ 041 @Override 042 protected void forwardReply(AbstractMRListener client, AbstractMRReply m) { 043 ((Z21Listener) client).reply((Z21Reply) m); 044 } 045 046 /** 047 * Invoked if it's appropriate to do low-priority polling of the command 048 * station, this should return the next message to send, or null if the TC 049 * should just sleep. 050 */ 051 @Override 052 protected Z21Message pollMessage() { 053 return null; 054 } 055 056 @Override 057 protected Z21Listener pollReplyHandler() { 058 return null; 059 } 060 061 /** 062 * enterProgMode() and enterNormalMode() return any message that 063 * needs to be returned to the command station to change modes. 064 * 065 * @see #enterNormalMode() 066 * @return if no message is needed, you may return null. 067 * 068 * If the programmerIdle() function returns true, enterNormalMode() is 069 * called after a timeout while in IDLESTATE during programming to 070 * return the system to normal mode. 071 */ 072 @Override 073 protected Z21Message enterProgMode() { 074 return null; 075 } 076 077 /** 078 * enterProgMode() and enterNormalMode() return any message that 079 * needs to be returned to the command station to change modes. 080 * 081 * @see #enterProgMode() 082 * @return if no message is needed, you may return null. 083 */ 084 @Override 085 protected Z21Message enterNormalMode() { 086 return null; 087 } 088 089 /** 090 * Actually transmits the next message to the port. 091 */ 092 @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT", "", "UW_UNCOND_WAIT"}, 093 justification = "Two locks needed for synchronization here, this is OK; String + only used for debug, so inefficient String processing not really a problem; Unconditional Wait is to give external hardware, which doesn't necessarilly respond, time to process the data.") 094 @Override 095 synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 096 if (log.isDebugEnabled()) { 097 log.debug("forwardToPort message: [{}]", m); 098 } 099 // remember who sent this 100 mLastSender = reply; 101 102 // forward the message to the registered recipients, 103 // which includes the communications monitor, except the sender. 104 // Schedule notification via the Swing event queue to ensure order 105 Runnable r = new XmtNotifier(m, mLastSender, this); 106 javax.swing.SwingUtilities.invokeLater(r); 107 108 // stream to port in single write, as that's needed by serial 109 byte[] msg = new byte[lengthOfByteStream(m)]; 110 // add header 111 int offset = addHeaderToOutput(msg, m); 112 113 // add data content 114 int len = m.getNumDataElements(); 115 for (int i = 0; i < len; i++) { 116 msg[i + offset] = (byte) m.getElement(i); 117 } 118 // add trailer 119 addTrailerToOutput(msg, len + offset, m); 120 // and send the bytes 121 try { 122 if (log.isDebugEnabled()) { 123 StringBuilder f = new StringBuilder("formatted message: "); 124 for (byte b : msg) { 125 f.append(Integer.toHexString(0xFF & b)); 126 f.append(" "); 127 } 128 log.debug(new String(f)); 129 } 130 while (m.getRetries() >= 0) { 131 if (portReadyToSend(controller)) { 132 // create a datagram with the data from the 133 // message. 134 byte[] data = ((Z21Message) m).getBuffer(); 135 DatagramPacket sendPacket 136 = new DatagramPacket(data, ((Z21Message) m).getLength(), host, port); 137 // and send it. 138 ((Z21Adapter) controller).getSocket().send(sendPacket); 139 log.debug("written, msg timeout: {} mSec", m.getTimeout()); 140 break; 141 } else if (m.getRetries() >= 0) { 142 if (log.isDebugEnabled()) { 143 StringBuilder b = new StringBuilder("Retry message: "); 144 b.append(m.toString()); 145 b.append(" attempts remaining: "); 146 b.append(m.getRetries()); 147 log.debug(new String(b)); 148 } 149 m.setRetries(m.getRetries() - 1); 150 try { 151 synchronized (xmtRunnable) { 152 xmtRunnable.wait(m.getTimeout()); 153 } 154 } catch (InterruptedException e) { 155 Thread.currentThread().interrupt(); // retain if needed later 156 if(!threadStopRequest) { 157 log.error("retry wait interrupted"); 158 } else { 159 log.error("retry wait interrupted during thread stop"); 160 } 161 } 162 } else { 163 log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg)); 164 } 165 } 166 } catch (Exception e) { 167 // TODO Currently there's no port recovery if an exception occurs 168 // must restart JMRI to clear xmtException. 169 xmtException = true; 170 portWarn(e); 171 } 172 } 173 174 @Override() 175 public boolean status() { 176 if (controller == null) { 177 return false; 178 } else { 179 return (controller.status()); 180 } 181 } 182 183 /** 184 * Make connection to existing PortController object. 185 */ 186 @Override 187 public void connectPort(AbstractPortController p) { 188 rcvException = false; 189 xmtException = false; 190 if (controller != null) { 191 log.warn("connectPort: connect called while connected"); 192 } else { 193 log.debug("connectPort invoked"); 194 } 195 if (!(p instanceof Z21Adapter)) { 196 throw new IllegalArgumentException("attempt to connect wrong port type"); 197 } 198 controller = p; 199 try { 200 host = java.net.InetAddress.getByName(((Z21Adapter) controller).getHostName()); 201 port = ((Z21Adapter) controller).getPort(); 202 ConnectionStatus.instance().setConnectionState( 203 p.getSystemConnectionMemo(), ConnectionStatus.CONNECTION_UP); 204 } catch (java.net.UnknownHostException uhe) { 205 log.error("Unknown Host: {} ", ((Z21Adapter) controller).getHostName()); 206 ConnectionStatus.instance().setConnectionState( 207 p.getSystemConnectionMemo(), ConnectionStatus.CONNECTION_DOWN); 208 } 209 // and start threads 210 xmtThread = new Thread(xmtRunnable = () -> { 211 try { 212 transmitLoop(); 213 } catch (Throwable e) { 214 if(!threadStopRequest) 215 log.error("Transmit thread terminated prematurely by: {}", e.toString(), e); 216 } 217 }); 218 xmtThread.setName("z21.Z21TrafficController Transmit thread"); 219 xmtThread.start(); 220 rcvThread = new Thread(this::receiveLoop); 221 rcvThread.setName("z21.Z21TrafficController Receive thread"); 222 int xr = rcvThread.getPriority(); 223 xr++; 224 rcvThread.setPriority(xr); //bump up the priority 225 rcvThread.start(); 226 } 227 228 /** 229 * Break connection to existing PortController object. Once broken, attempts 230 * to send via "message" member will fail. 231 */ 232 @Override 233 public void disconnectPort(AbstractPortController p) { 234 if (controller != p) { 235 log.warn("disconnectPort: disconnect called from non-connected AbstractPortController"); 236 } 237 controller = null; 238 } 239 240 @Override 241 protected Z21Reply newReply() { 242 return new Z21Reply(); 243 } 244 245 @Override 246 protected boolean endOfMessage(AbstractMRReply r) { 247 // since this is a UDP protocol, and each reply in the packet is complete, 248 // we don't check for end of message manually. 249 return true; 250 } 251 252 /** 253 * Handle each reply when complete. 254 * <p> 255 * (This is public for testing purposes) Runs in the "Receive" thread. 256 */ 257 @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP", "NO_NOTIFY_NOT_NOTIFYALL"}, 258 justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data. Notify is used because Having more than one thread waiting on xmtRunnable is an error.") 259 @Override 260 public void handleOneIncomingReply() throws java.io.IOException { 261 // we sit in this until the message is complete, relying on 262 // threading to let other stuff happen 263 264 // create a buffer to hold the incoming data. 265 byte[] buffer = new byte[100]; // the size here just needs to be longer 266 // than the longest protocol message. 267 // Otherwise, the receive will truncate. 268 269 // create the packet. 270 DatagramPacket receivePacket = new DatagramPacket(buffer, 100, host, port); 271 272 // and wait to receive data in the packet. 273 try { 274 ((Z21Adapter) controller).getSocket().receive(receivePacket); 275 } catch (java.net.SocketException | NullPointerException se) { 276 // if we are waiting when the controller is disposed, 277 // a socket exception will be thrown. 278 log.debug("Socket exception during receive. Connection Closed?"); 279 rcvException = true; 280 return; 281 } 282 if (threadStopRequest) return; 283 284 // handle more than one reply in the same UDP packet. 285 List<Z21Reply> replies = new ArrayList<>(); 286 287 int totalLength=receivePacket.getLength(); 288 int consumed=0; 289 290 do { 291 int length = (0xff & buffer[0]) + ((0xff & buffer[1]) << 8); 292 Z21Reply msg = new Z21Reply(buffer, length); 293 294 replies.add(msg); 295 296 buffer = Arrays.copyOfRange(buffer,length,buffer.length); 297 consumed +=length; 298 log.trace("total length: {} consumed {}",totalLength,consumed); 299 } while(totalLength>consumed); 300 301 302 // and then dispatch each reply 303 replies.forEach(this::dispatchReply); 304 } 305 306 private void dispatchReply(Z21Reply msg) { 307 // message is complete, dispatch it !! 308 replyInDispatch = true; 309 if (log.isDebugEnabled()) { 310 log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState); 311 } 312 313 // forward the message to the registered recipients, 314 // which includes the communications monitor 315 // return a notification via the Swing event queue to ensure proper thread 316 Runnable r = new RcvNotifier(msg, mLastSender, this); 317 try { 318 javax.swing.SwingUtilities.invokeAndWait(r); 319 } catch (InterruptedException ie) { 320 if(threadStopRequest) return; 321 log.error("Unexpected exception in invokeAndWait:{}", ie, ie); 322 } catch (Exception e) { 323 log.error("Unexpected exception in invokeAndWait:{}", e, e); 324 } 325 if (log.isDebugEnabled()) { 326 log.debug("dispatch thread invoked"); 327 } 328 329 if (!msg.isUnsolicited()) { 330 // effect on transmit: 331 switch (mCurrentState) { 332 case WAITMSGREPLYSTATE: { 333 // check to see if the response was an error message we want 334 // to automatically handle by re-queueing the last sent 335 // message, otherwise go on to the next message 336 if (msg.isRetransmittableErrorMsg()) { 337 if (log.isDebugEnabled()) { 338 log.debug("Automatic Recovery from Error Message: +msg.toString()"); 339 } 340 synchronized (xmtRunnable) { 341 mCurrentState = AUTORETRYSTATE; 342 replyInDispatch = false; 343 xmtRunnable.notify(); 344 } 345 } else { 346 // update state, and notify to continue 347 synchronized (xmtRunnable) { 348 mCurrentState = NOTIFIEDSTATE; 349 replyInDispatch = false; 350 xmtRunnable.notify(); 351 } 352 } 353 break; 354 } 355 case WAITREPLYINPROGMODESTATE: { 356 // entering programming mode 357 mCurrentMode = PROGRAMINGMODE; 358 replyInDispatch = false; 359 360 // check to see if we need to delay to allow decoders to become 361 // responsive 362 int warmUpDelay = enterProgModeDelayTime(); 363 if (warmUpDelay != 0) { 364 try { 365 synchronized (xmtRunnable) { 366 xmtRunnable.wait(warmUpDelay); 367 } 368 } catch (InterruptedException e) { 369 Thread.currentThread().interrupt(); // retain if needed later 370 if (threadStopRequest) return; 371 } 372 } 373 // update state, and notify to continue 374 synchronized (xmtRunnable) { 375 mCurrentState = OKSENDMSGSTATE; 376 xmtRunnable.notify(); 377 } 378 break; 379 } 380 case WAITREPLYINNORMMODESTATE: { 381 // entering normal mode 382 mCurrentMode = NORMALMODE; 383 replyInDispatch = false; 384 // update state, and notify to continue 385 synchronized (xmtRunnable) { 386 mCurrentState = OKSENDMSGSTATE; 387 xmtRunnable.notify(); 388 } 389 break; 390 } 391 default: { 392 replyInDispatch = false; 393 if (allowUnexpectedReply) { 394 if (log.isDebugEnabled()) { 395 log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg.toString()); 396 } 397 synchronized (xmtRunnable) { 398 // The transmit thread sometimes gets stuck 399 // when unexpected replies are received. Notify 400 // it to clear the block without a timeout. 401 // (do not change the current state) 402 //if(mCurrentState!=IDLESTATE) 403 xmtRunnable.notify(); 404 } 405 } else { 406 unexpectedReplyStateError(mCurrentState,msg.toString()); 407 } 408 } 409 } 410 // Unsolicited message 411 } else { 412 if (log.isDebugEnabled()) { 413 log.debug("Unsolicited Message Received {}", msg.toString()); 414 } 415 416 replyInDispatch = false; 417 } 418 } 419 420 @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"}, 421 justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.") 422 @Override 423 protected void terminate() { 424 if (controller == null) { 425 log.debug("terminate called while not connected"); 426 return; 427 } else { 428 log.debug("Cleanup Starts"); 429 } 430 431 Z21Message logoffMessage = Z21Message.getLanLogoffRequestMessage(); 432 forwardToPort(logoffMessage, null); 433 // wait for reply 434 try { 435 if (xmtRunnable != null) { 436 synchronized (xmtRunnable) { 437 xmtRunnable.wait(logoffMessage.getTimeout()); 438 } 439 } 440 } catch (InterruptedException e) { 441 Thread.currentThread().interrupt(); // retain if needed later 442 log.error("transmit interrupted"); 443 } finally { 444 // set the controller to null, even if terminate fails. 445 controller = null; 446 } 447 } 448 449 /** 450 * Terminate the receive and transmit threads. 451 * <p> 452 * This is intended to be used only by testing subclasses. 453 */ 454 @Override 455 public void terminateThreads() { 456 threadStopRequest = true; 457 // ensure socket closed to end pending operations 458 if ( controller != null && ((Z21Adapter) controller).getSocket() != null) ((Z21Adapter) controller).getSocket().close(); 459 460 // usual stop process 461 super.terminateThreads(); 462 } 463 464 // The methods to implement the Z21Interface 465 @Override 466 public synchronized void addz21Listener(Z21Listener l) { 467 this.addListener(l); 468 } 469 470 @Override 471 public synchronized void removez21Listener(Z21Listener l) { 472 this.removeListener(l); 473 } 474 475 /** 476 * Forward a preformatted message to the actual interface. 477 */ 478 @Override 479 public void sendz21Message(Z21Message m, Z21Listener reply) { 480 sendMessage(m, reply); 481 } 482 483 private static final Logger log = LoggerFactory.getLogger(Z21TrafficController.class); 484}