Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Tango ClusterQueue Message Size > 2048

Moderators: kris

Posted: 10/05/09 08:17:35


I'm currently trying to use the Tango ClusterQueue?. If I try to send messages larger than 2048 bytes it always throws " No appropriate cluster nodes are available" on the client side. I identified that this problem is related to the problem that if the message body string is larger than 2048 the size of the message is set to zero in the ProtocolWriter? which in fact produces an exception in in line 140

// may throw an exception if the payload is too large to fit
// completely inside the buffer!
buffer.slice (size, false);

When extending and changing the GrowBuffer? size of 1024 * 2 to e.g. 1024 * 3 in line 385 everything works well for at least for a message size of 2049.

// this buffer will grow as required to house larger messages
buffer = new GrowBuffer (1024 * 2);
writer = new ProtocolWriter (buffer);

Does anybody knows whats going wrong on here? Its not an issue of the server code its related to the client code. The server works fine with the 1024 * 2 buffer but not the client. Any ideas? Thanks in advance. Tom.

Author Message

Posted: 10/05/09 08:58:14

This is the code that produces the problem. If you change the line auto tmp = new char[2048] to auto tmp = new char[2047] the code actually works fine.

module QueueClient;



import tango.util.log.Log, tango.util.log.AppendConsole;

class MyMessage : NetworkMessage
        public static this ()
             NetworkRegistry.instance.enroll (new MyMessage);
        void write (IWriter output)
             auto tmp = new char[2048];
             super.write (output);
             output (tmp);

void main ()
         auto appender = new AppendConsole;
         auto log = Log.getLogger("queue.persist");
         auto cluster = (new Cluster(log)).join();
         auto queue = new NetworkQueue (cluster, "queue.server");
         auto message = new MyMessage;

         queue.put (message);

Posted: 10/05/09 08:59:59 -- Modified: 10/05/09 09:01:39 by

The server code is the same as proposed in the QueueServer? documentation.

Posted: 10/06/09 06:45:32

I found out that the problem is somehow related to the put method

        ProtocolWriter put (Command cmd, char[] channel, char[] element = null, IMessage msg = null)
                auto time = (msg ? msg.time : Time.init);
                // reset the buffer first!

                auto content = cast(ubyte[]) buffer.getContent;
                emit (cast(ushort) 0)
                     (cast(ubyte) cmd)
                     (cast(ubyte) Version)
                     (cast(ulong) time.ticks)

                // is there a payload?
                if (msg)
                    NetworkRegistry.instance.freeze (emit, msg);

                // go back and write the total number of bytes
                auto size = buffer.limit;
                content[0] = cast(ubyte) (size >> 8);
                content[1] = cast(ubyte) (size & 0xff);
                return this;

If the message size is larger than 2048 the problem comes up. If you change the length of the content ubyte array to 4096 everything works fine.

auto content = cast(ubyte[]) buffer.getContent;
content.length = content.length * 2; // 2048 * 2

Does anybody knows how to solve this problem? It seems that content always has a size of 2048 as the GrowBuffer? is initialized with 2048 as its size. Therefore a message might be larger than the size of content. This actually produces the problem.

Posted: 10/06/09 12:33:26

found the problem and solution. see ticket 1756.