From 978c94fba63197936a2ffdf270872fd3f1c140fb Mon Sep 17 00:00:00 2001 From: tomazas Date: Mon, 17 Aug 2015 12:47:42 +0300 Subject: [PATCH 1/2] Fix concurrent AVP encoding issue - AvpDataException: Not enough data in buffer! --- .../jdiameter/client/impl/SessionImpl.java | 63 ++++++++++--- .../jdiameter/client/impl/parser/AvpImpl.java | 58 +++++++----- .../client/impl/parser/AvpSetImpl.java | 71 +++++++++++---- .../client/impl/parser/ElementParser.java | 90 +++++++++++++++---- .../client/impl/parser/MessageParser.java | 49 +++++++++- .../transport/tcp/TCPClientConnection.java | 11 ++- .../transport/tcp/TCPTransportClient.java | 13 ++- 7 files changed, 275 insertions(+), 80 deletions(-) diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/SessionImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/SessionImpl.java index b87f510b7..f0a7c7a2b 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/SessionImpl.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/SessionImpl.java @@ -22,15 +22,30 @@ package org.jdiameter.client.impl; -import org.jdiameter.api.*; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.jdiameter.api.Answer; +import org.jdiameter.api.ApplicationId; +import org.jdiameter.api.Avp; +import org.jdiameter.api.EventListener; +import org.jdiameter.api.IllegalDiameterStateException; +import org.jdiameter.api.InternalException; +import org.jdiameter.api.Message; +import org.jdiameter.api.NetworkReqListener; +import org.jdiameter.api.OverloadException; +import org.jdiameter.api.RawSession; +import org.jdiameter.api.Request; +import org.jdiameter.api.RouteException; +import org.jdiameter.client.api.IAssembler; import org.jdiameter.client.api.IContainer; import org.jdiameter.client.api.IMessage; import org.jdiameter.client.api.IRequest; import org.jdiameter.client.api.ISession; import org.jdiameter.client.api.parser.IMessageParser; import org.jdiameter.common.api.data.ISessionDatasource; - -import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation for {@link ISession} @@ -41,6 +56,10 @@ */ public class SessionImpl extends BaseSessionImpl implements ISession { + private static final Logger logger = LoggerFactory.getLogger(SessionImpl.class); + + private Semaphore lock = new Semaphore(1); // container lock + SessionImpl(IContainer container) { setContainer(container); try { @@ -52,8 +71,15 @@ public class SessionImpl extends BaseSessionImpl implements ISession { } void setContainer(IContainer container) { - this.container = container; - this.parser = (IMessageParser) container.getAssemblerFacility().getComponentInstance(IMessageParser.class); + try { + lock.acquire(); // allow container change only if not releasing + this.container = container; + this.parser = (IMessageParser) container.getAssemblerFacility().getComponentInstance(IMessageParser.class); + } catch (InterruptedException e) { + logger.error("failure getting lock", e); + } finally { + lock.release(); + } } public void send(Message message, EventListener listener) throws InternalException, IllegalDiameterStateException, RouteException, OverloadException { @@ -132,14 +158,25 @@ public Request createRequest(Request prevRequest) { public void release() { isValid = false; - if (container != null) { - container.removeSessionListener(sessionId); - // FIXME - container.getAssemblerFacility().getComponentInstance(ISessionDatasource.class).removeSession(sessionId); - } - container = null; - parser = null; - reqListener = null; + + try { + lock.acquire(); // prevent container NullPointerException + + if (container != null) { + container.removeSessionListener(sessionId); + IAssembler assembler = container.getAssemblerFacility(); + ISessionDatasource datasource = assembler.getComponentInstance(ISessionDatasource.class); + datasource.removeSession(sessionId); + } + + container = null; + parser = null; + reqListener = null; + } catch (InterruptedException e) { + logger.error("failure getting lock", e); + } finally { + lock.release(); + } } public boolean isWrapperFor(Class iface) throws InternalException { diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpImpl.java index 0df258355..c1aff54f1 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpImpl.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpImpl.java @@ -25,6 +25,7 @@ import java.net.InetAddress; import java.net.URISyntaxException; import java.net.UnknownServiceException; +import java.util.Arrays; import java.util.Date; import org.jdiameter.api.Avp; @@ -45,15 +46,15 @@ class AvpImpl implements Avp { private static final long serialVersionUID = 1L; private static final ElementParser parser = new ElementParser(); - int avpCode; - long vendorID; + private int avpCode; + private long vendorID; - boolean isMandatory = false; - boolean isEncrypted = false; - boolean isVendorSpecific = false; + private boolean isMandatory = false; + private boolean isEncrypted = false; + private boolean isVendorSpecific = false; - byte[] rawData = new byte[0]; - AvpSet groupedData; + private byte[] rawData = null; + private AvpSet groupedData = new AvpSetImpl(); private static final Logger logger = LoggerFactory.getLogger(AvpImpl.class); @@ -65,7 +66,10 @@ class AvpImpl implements Avp { isVendorSpecific = (flags & 0x80) != 0; // vendorID = vnd; - rawData = data; + + if (data != null) { // any data string/int/encoded-grouped + rawData = Arrays.copyOf(data, data.length); + } } AvpImpl(Avp avp) { @@ -75,13 +79,19 @@ class AvpImpl implements Avp { isEncrypted = avp.isEncrypted(); isVendorSpecific = avp.isVendorId(); try { - rawData = avp.getRaw(); - if (rawData == null || rawData.length == 0) { - groupedData = avp.getGrouped(); - } - } - catch (AvpDataException e) { - logger.debug("Can not create Avp", e); + byte[] data = avp.getRaw(); + if (data != null) { // simple AVP + rawData = Arrays.copyOf(data, data.length); + } else { + // grouped AVP + AvpSet grouped = avp.getGrouped(); + + if (grouped != null) { + groupedData = parser.decodeAvpSet(parser.encodeAvpSet(grouped)); // copy all + } + } + } catch (Exception e) { + logger.error("Can not create Avp", e); } } @@ -224,13 +234,12 @@ public URI getDiameterURI() throws AvpDataException { public AvpSet getGrouped() throws AvpDataException { try { - if (groupedData == null) { - groupedData = parser.decodeAvpSet(rawData); - rawData = new byte[0]; - } - return groupedData; - } - catch (Exception e) { + if (rawData != null) { + return parser.decodeAvpSet(rawData); + } else { + return groupedData; + } + } catch (Exception e) { throw new AvpDataException(e, this); } } @@ -244,7 +253,7 @@ public T unwrap(Class aClass) throws InternalException { } public byte[] getRawData() { - return (rawData == null || rawData.length == 0) ? parser.encodeAvpSet(groupedData) : rawData; + return rawData; } // Caching toString.. Avp shouldn't be modified once created. @@ -253,7 +262,8 @@ public byte[] getRawData() { @Override public String toString() { if(toString == null) { - this.toString = new StringBuffer("AvpImpl [avpCode=").append(avpCode).append(", vendorID=").append(vendorID).append("]@").append(super.hashCode()).toString(); + this.toString = new StringBuffer("AvpImpl [avpCode=").append(avpCode).append(", vendorID=").append(vendorID) + .append(", len=").append((rawData != null) ? rawData.length : null).append("]@").append(super.hashCode()).toString(); } return this.toString; diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpSetImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpSetImpl.java index 9eb6bb3ed..a8f48f286 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpSetImpl.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/AvpSetImpl.java @@ -37,7 +37,10 @@ import java.util.Iterator; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.jdiameter.api.Avp; +import org.jdiameter.api.AvpDataException; import org.jdiameter.api.AvpSet; import org.jdiameter.api.InternalException; import org.jdiameter.api.URI; @@ -53,6 +56,8 @@ class AvpSetImpl implements AvpSet { // FIXME: by default 3588.4-1 says: 'M' should be set to true; // FIXME: by default 3588.x says: if grouped has at least on AVP with 'M' set, it also has to have 'M' set! - TODO: add backmapping. + + private static final Logger logger = LoggerFactory.getLogger(AvpSetImpl.class); private static final long serialVersionUID = 1L; private static final ElementParser parser = new ElementParser(); @@ -187,10 +192,15 @@ public Avp insertAvp(int index, int avpCode, long value, long vndId, boolean mFl } public AvpSet insertGroupedAvp(int index, int avpCode) { - AvpImpl res = new AvpImpl(avpCode, 0, 0, new byte[0]); - res.groupedData = new AvpSetImpl(); + AvpImpl res = new AvpImpl(avpCode, 0, 0, null); this.avps.add(index, res); - return res.groupedData; + + try { + return res.getGrouped(); + } catch (AvpDataException e) { + logger.error("insert avp failed", e); + } + return null; } public int size() { @@ -421,26 +431,41 @@ public Avp addAvp(int avpCode, Date value, long vndId, boolean mFlag, boolean pF } public AvpSet addGroupedAvp(int avpCode) { - AvpImpl res = new AvpImpl(avpCode, 0, 0, new byte[0] ); - res.groupedData = new AvpSetImpl(); + AvpImpl res = new AvpImpl(avpCode, 0, 0, null); this.avps.add(res); - return res.groupedData; + + try { + return res.getGrouped(); + } catch (AvpDataException e) { + logger.error("add avp failed", e); + } + return null; } public AvpSet addGroupedAvp(int avpCode, boolean mFlag, boolean pFlag) { int flags = ((mFlag ? 0x40:0) | (pFlag ? 0x20:0)); - AvpImpl res = new AvpImpl(avpCode, flags, 0, new byte[0] ); - res.groupedData = new AvpSetImpl(); + AvpImpl res = new AvpImpl(avpCode, flags, 0, null); this.avps.add(res); - return res.groupedData; + + try { + return res.getGrouped(); + } catch (AvpDataException e) { + logger.error("add avp failed", e); + } + return null; } public AvpSet addGroupedAvp(int avpCode, long vndId, boolean mFlag, boolean pFlag) { int flags = ((vndId !=0 ? 0x80:0) | (mFlag ? 0x40:0) | (pFlag ? 0x20:0)); - AvpImpl res = new AvpImpl(avpCode, flags, vndId, new byte[0] ); - res.groupedData = new AvpSetImpl(); + AvpImpl res = new AvpImpl(avpCode, flags, vndId, null); this.avps.add(res); - return res.groupedData; + + try { + return res.getGrouped(); + } catch (AvpDataException e) { + logger.error("add avp failed", e); + } + return null; } public Avp insertAvp(int index, int avpCode, byte[] value) { @@ -654,18 +679,28 @@ public Avp insertAvp(int index, int avpCode, Date value, long vndId, boolean mFl public AvpSet insertGroupedAvp(int index, int avpCode, boolean mFlag, boolean pFlag) { int flags = ((mFlag ? 0x40:0) | (pFlag ? 0x20:0)); - AvpImpl res = new AvpImpl(avpCode, flags, 0, new byte[0] ); - res.groupedData = new AvpSetImpl(); + AvpImpl res = new AvpImpl(avpCode, flags, 0, null); this.avps.add(index, res); - return res.groupedData; + + try { + return res.getGrouped(); + } catch (AvpDataException e) { + logger.error("insert avp failed", e); + } + return null; } public AvpSet insertGroupedAvp(int index, int avpCode, long vndId, boolean mFlag, boolean pFlag) { int flags = ((vndId !=0 ? 0x80:0) | (mFlag ? 0x40:0) | (pFlag ? 0x20:0)); - AvpImpl res = new AvpImpl(avpCode, flags, vndId, new byte[0] ); - res.groupedData = new AvpSetImpl(); + AvpImpl res = new AvpImpl(avpCode, flags, vndId, null); this.avps.add(index, res); - return res.groupedData; + + try { + return res.getGrouped(); + } catch (AvpDataException e) { + logger.error("insert avp failed", e); + } + return null; } public boolean isWrapperFor(Class aClass) throws InternalException { diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/ElementParser.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/ElementParser.java index 502251f41..5890dbd3d 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/ElementParser.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/ElementParser.java @@ -25,6 +25,7 @@ import org.jdiameter.api.Avp; import org.jdiameter.api.AvpDataException; import org.jdiameter.api.AvpSet; +import org.jdiameter.api.InternalException; import org.jdiameter.client.api.parser.ParseException; import org.jdiameter.client.api.parser.IElementParser; import org.slf4j.Logger; @@ -50,7 +51,7 @@ */ public class ElementParser implements IElementParser { - private static final Logger logger = LoggerFactory.getLogger(ElementParser.class); + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ElementParser.class); /** * This is seconds shift (70 years in seconds) applied to date, * since NTP date starts since 1900, not 1970. @@ -251,6 +252,42 @@ public T bytesToObject(java.lang.Class iface, byte[] rawdata) throws AvpD public AvpSetImpl decodeAvpSet(byte[] buffer) throws IOException, AvpDataException { return this.decodeAvpSet(buffer, 0); } + + private String fullDecode(byte[] buffer, int shift) throws IOException { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer, shift, buffer.length)); + StringBuilder sb = new StringBuilder(); + + int read = shift; + while (read < buffer.length) { + int code = in.readInt(); + int flags = in.readInt(); + int consumed = 8; + + int vendor = ((flags & 0x80) != 0) ? in.readInt() : -1; + if (vendor != -1) { + consumed += 4; + } + + int length = (int)(flags & 0xFFFFFF) - consumed; + int padding = ((length % 4) != 0) ? (4 - (length % 4)) : 0; + length += padding; + + int num_read = ((read + consumed + length) > buffer.length) ? (buffer.length-consumed-read) : length; + int skip = num_read; + + while (skip > 0) { + skip -= in.skipBytes(skip); + } + + read += (consumed + num_read); + + sb.append(" code: ").append(code).append(" flags: ").append(flags) + .append(" len: ").append(length).append(" pad: ").append(padding) + .append(" skip: ").append(skip).append(" read: ").append(read).append("\n"); + } + + return sb.toString(); + } /** * @@ -262,6 +299,11 @@ public AvpSetImpl decodeAvpSet(byte[] buffer) throws IOException, AvpDataExcepti */ public AvpSetImpl decodeAvpSet(byte[] buffer, int shift) throws IOException, AvpDataException { AvpSetImpl avps = new AvpSetImpl(); + + if (buffer == null) { + return avps; // empty + } + int tmp, counter = shift; DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer, shift, buffer.length /* - shift ? */)); @@ -270,8 +312,11 @@ public AvpSetImpl decodeAvpSet(byte[] buffer, int shift) throws IOException, Avp tmp = in.readInt(); int flags = (tmp >> 24) & 0xFF; int length = tmp & 0xFFFFFF; + if(length < 0 || counter + length > buffer.length) { - throw new AvpDataException("Not enough data in buffer!"); + logger.error("unable to decode code: {}, flags: {}, length: {}, counter: {}, shift:{}, buf_size: {}\n{}\n{}", + new Object[]{code, (short)flags, length, counter, shift, buffer.length, MessageParser.byteArrayToHexString(buffer), fullDecode(buffer,shift)}); + throw new AvpDataException("Not enough data in buffer!"); } long vendor = 0; if ((flags & 0x80) != 0) { @@ -295,33 +340,44 @@ public AvpSetImpl decodeAvpSet(byte[] buffer, int shift) throws IOException, Avp } public byte[] encodeAvpSet(AvpSet avps) { + return encodeAvpSet(avps, null, ""); + } + + public byte[] encodeAvpSet(AvpSet avps, StringBuilder sb, String prefix) { ByteArrayOutputStream out = new ByteArrayOutputStream(); try { DataOutputStream data = new DataOutputStream(out); for (Avp a : avps) { - if (a instanceof AvpImpl) { - AvpImpl aImpl = (AvpImpl) a; - if (aImpl.rawData.length == 0 && aImpl.groupedData != null) { - aImpl.rawData = encodeAvpSet(a.getGrouped()); + byte[] raw = a.getRaw(); + byte[] enc = encodeAvp(a); + + if (sb != null) { + sb.append(prefix).append(a).append(", len: ").append((raw != null) ? raw.length : null) + .append(", enc: ").append(MessageParser.byteArrayToHexStringLine(enc)).append("\n"); } - data.write(encodeAvp(aImpl)); - } + + data.write(enc); } - } - catch (Exception e) { - logger.debug("Error during encode avps", e); + } catch (Exception e) { + logger.error("Error during encode avps", e); } return out.toByteArray(); - } + } - public byte[] encodeAvp(AvpImpl avp) { + public byte[] encodeAvp(Avp avp) { ByteArrayOutputStream out = new ByteArrayOutputStream(); try { DataOutputStream data = new DataOutputStream(out); data.writeInt(avp.getCode()); int flags = (byte) ((avp.getVendorId() != 0 ? 0x80 : 0) | (avp.isMandatory() ? 0x40 : 0) | (avp.isEncrypted() ? 0x20 : 0)); - int origLength = avp.getRaw().length + 8 + (avp.getVendorId() != 0 ? 4 : 0); + + byte[] raw = avp.getRaw(); + if (raw == null) { + raw = encodeAvpSet(avp.getGrouped()); + } + + int origLength = raw.length + 8 + (avp.getVendorId() != 0 ? 4 : 0); // newLength is never used. Should it? //int newLength = origLength; //if (newLength % 4 != 0) { @@ -331,9 +387,9 @@ public byte[] encodeAvp(AvpImpl avp) { if (avp.getVendorId() != 0) { data.writeInt((int) avp.getVendorId()); } - data.write(avp.getRaw()); - if (avp.getRaw().length % 4 != 0) { - for(int i = 0; i < 4 - avp.getRaw().length % 4; i++) { + data.write(raw); + if (raw.length % 4 != 0) { + for(int i = 0; i < 4 - raw.length % 4; i++) { data.write(0); } } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/MessageParser.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/MessageParser.java index 5ccc3ac8d..1526fb820 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/MessageParser.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/parser/MessageParser.java @@ -74,7 +74,7 @@ */ public class MessageParser extends ElementParser implements IMessageParser { - private static final Logger logger = LoggerFactory.getLogger(MessageParser.class); + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(MessageParser.class); protected UIDGenerator endToEndGen = new UIDGenerator( (int) (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) & 0xFFF) << 20 @@ -239,11 +239,48 @@ void copyBasicAvps(IMessage newMessage, IMessage prnMessage, boolean invertPoint // } } } + + public static String byteArrayToHexString(byte in[]) { + return byteArrayToHexString(in, true); + } + + public static String byteArrayToHexString(byte in[], boolean columnize) { + if (in == null || in.length <= 0) return ""; + String pseudo = "0123456789ABCDEF"; + + StringBuffer out = new StringBuffer(in.length * 3); + + for (int i=0; i < in.length; i++) { + byte ch = in[i]; + out.append(pseudo.charAt((int) ((ch & 0xF0) >> 4))); + out.append(pseudo.charAt((int) (ch & 0x0F))); + + if (columnize) { + if ((i+1)%16 == 0) { + out.append("\n"); + } else if ((i+1)%4 == 0) { + out.append(" "); + } + } + } + + return out.toString(); + } + + public static String byteArrayToHexStringLine(byte in[]) { + return byteArrayToHexString(in, false); + } public ByteBuffer encodeMessage(IMessage message) throws ParseException { ByteArrayOutputStream out = new ByteArrayOutputStream(); + + StringBuilder sb = null; + if (logger.isTraceEnabled()) { + sb = new StringBuilder(); + } + try { - byte[] rawData = encodeAvpSet(message.getAvps()); + byte[] rawData = encodeAvpSet(message.getAvps(), sb, ""); DataOutputStream data = new DataOutputStream(out); // Wasting processor time, are we ? // int tmp = (1 << 24) & 0xFF000000; @@ -265,7 +302,13 @@ public ByteBuffer encodeMessage(IMessage message) throws ParseException { throw new ParseException("Failed to encode message.", e); } try{ - return prepareBuffer(out.toByteArray(), out.size()); + byte[] enc = out.toByteArray(); + + if (logger.isTraceEnabled()) { + String hex = byteArrayToHexString(enc); + logger.trace("encoded message {} of size [{}]:\n{}\n{}", new Object[]{message, enc.length, sb.toString(), hex}); + } + return prepareBuffer(enc, out.size()); } catch(AvpDataException ade) { throw new ParseException(ade); diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java index 1e95e65fa..8617f30dc 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java @@ -32,6 +32,7 @@ import org.jdiameter.client.api.io.TransportError; import org.jdiameter.client.api.io.TransportException; import org.jdiameter.client.api.parser.IMessageParser; +import org.jdiameter.client.impl.parser.MessageParser; import org.jdiameter.common.api.concurrent.IConcurrentFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ */ public class TCPClientConnection implements IConnection { - private static Logger logger = LoggerFactory.getLogger(TCPClientConnection.class); + private static org.slf4j.Logger logger = LoggerFactory.getLogger(TCPClientConnection.class); private final long createdTime; private TCPTransportClient client; @@ -263,7 +264,13 @@ protected void onDisconnect() throws AvpDataException { protected void onMessageReceived(ByteBuffer message) throws AvpDataException { if (logger.isDebugEnabled()) { - logger.debug("Received message of size [{}]", message.array().length); + if (logger.isTraceEnabled()) { + String hex = MessageParser.byteArrayToHexString(message.array()); + logger.trace("Received message of size [{}]\n{}", + new Object[] {message.array().length, hex}); + } else { + logger.debug("Received message of size [{}]", message.array().length); + } } onEvent(new Event(EventType.MESSAGE_RECEIVED, message)); } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java index d38329d08..50a27b2a8 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java @@ -44,6 +44,7 @@ import org.jdiameter.api.AvpDataException; import org.jdiameter.client.api.io.NotInitializedException; +import org.jdiameter.client.impl.parser.MessageParser; import org.jdiameter.common.api.concurrent.IConcurrentFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +96,7 @@ public class TCPTransportClient implements Runnable { private String socketDescription = null; - private static final Logger logger = LoggerFactory.getLogger(TCPTransportClient.class); + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TCPTransportClient.class); //PCB - allow non blocking IO private static final boolean BLOCKING_IO = false; @@ -290,8 +291,14 @@ public InetSocketAddress getOrigAddress() { } public void sendMessage(ByteBuffer bytes) throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("About to send a byte buffer of size [{}] over the TCP nio socket [{}]", bytes.array().length, socketDescription); + if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { + String hex = MessageParser.byteArrayToHexString(bytes.array()); + logger.trace("About to send a byte buffer of size [{}] over the TCP nio socket [{}]\n{}", + new Object[]{bytes.array().length, socketDescription, hex}); + } else { + logger.debug("About to send a byte buffer of size [{}] over the TCP nio socket [{}]", bytes.array().length, socketDescription); + } } int rc = 0; // PCB - removed locking From 47448f3989c8f36b26a1d33e2fb0630e9ca8c092 Mon Sep 17 00:00:00 2001 From: darius Date: Wed, 2 Sep 2015 12:10:50 +0300 Subject: [PATCH 2/2] Fixing issue that peer is not added after initiating jmx-console command, also adding logging and adding condition to skip adding peer to realm if it already exists --- .../server/impl/MutablePeerTableImpl.java | 15 ++++++++++++++- .../diameter/stack/DiameterStackMultiplexer.java | 4 ++-- .../stack/DiameterStackMultiplexerMBean.java | 3 ++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java index 4e27b2cdf..1180161df 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java @@ -623,6 +623,7 @@ public Peer addPeer(URI peerURI, String realm, boolean connecting) { //TODO: add sKey here, now it adds peer to all realms. //TODO: better, separate addPeer from realm! try { + logger.debug("Adding peer, URI-{}, realm-{}, connecting-{}", new Object[] {peerURI, realm, connecting}); Configuration peerConfig = null; Configuration[] peers = config.getChildren(PeerTable.ordinal()); // find peer config @@ -647,7 +648,19 @@ public Peer addPeer(URI peerURI, String realm, boolean connecting) { Collection realms = this.router.getRealmTable().getRealms(realm); for (Realm r : realms) { if (r.getName().equals(realm)) { - ((IRealm)r).addPeerName(peerURI.toString()); + boolean peerNameFound = false; + for (String peerName : ((IRealm)r).getPeerNames()) { + if (peerName != null && peerName.equals(peerURI.getFQDN())) { + peerNameFound = true; + break; + } + } + if (!peerNameFound) { + ((IRealm)r).addPeerName(peerURI.getFQDN()); + logger.debug("Adding peerName-{} to realm-{}", peerURI.getFQDN(), realm); + } else { + logger.debug("Skipped adding peerName-{} to realm-{}, because it already exists", peerURI.getFQDN(), realm); + } found = true; break; } diff --git a/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexer.java b/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexer.java index 47f78477c..14dc3be07 100644 --- a/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexer.java +++ b/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexer.java @@ -641,10 +641,10 @@ public void _LocalPeer_setVendorId(long vendorId) throws MBeanException { * (non-Javadoc) * @see org.mobicents.diameter.stack.DiameterStackMultiplexerMBean#_Network_Peers_addPeer(java.lang.String, boolean, int) */ - public void _Network_Peers_addPeer(String name, boolean attemptConnect, int rating) throws MBeanException { + public void _Network_Peers_addPeer(String name, boolean attemptConnect, int rating, String realm) throws MBeanException { try { NetworkImpl n = (NetworkImpl) stack.unwrap(Network.class); - /*Peer p =*/ n.addPeer(name, "", attemptConnect); // FIXME: This requires realm... + n.addPeer(name, realm, attemptConnect); } catch (IllegalArgumentException e) { logger.warn(e.getMessage()); diff --git a/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexerMBean.java b/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexerMBean.java index 8736bfbd1..c5cc28af0 100644 --- a/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexerMBean.java +++ b/core/mux/jar/src/main/java/org/mobicents/diameter/stack/DiameterStackMultiplexerMBean.java @@ -201,9 +201,10 @@ public interface DiameterStackMultiplexerMBean extends ServiceMBean { * @param name the name/uri of the peer * @param attemptConnect indicates if the stack should try to connect to this peer or wait for incoming connection * @param rating the peer rating for decision on message routing + * @param realm name of the realm * @throws MBeanException if the operation is unable to perform correctly */ - public void _Network_Peers_addPeer(String name, boolean attemptConnect, int rating) throws MBeanException; + public void _Network_Peers_addPeer(String name, boolean attemptConnect, int rating, String realm) throws MBeanException; /** * Removes a peer definition from stack.