tango.net.cluster.tina.Cluster

License:

BSD style: see license.txt

Version:

July 2004: Initial release

Author:

Kris
class Cluster : Broadcaster, ICluster #
QOS implementation for sockets. All cluster-client activity is gated through here by the higher level classes; NetworkQueue & NetworkCache for example. You gain access to the cluster by creating an instance of the QOS (quality of service) you desire and either mapping client classes onto it, or usign it directly. For example:
1
2
3
4
5
6
7
8
import tango.net.cluster.tina.Cluster;

auto cluster = new Cluster;
cluster.join;

auto channel = cluster.createChannel (...);
channel.putQueue (...);
channel.getQueue ();
Please see the cluster clients for additional details. Currently these include CacheInvalidator, CacheInvalidatee, NetworkMessage, NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the Client base-class.
this() #
Create a cluster instance with a default logger and Nagle caching disabled
this(Logger log, bool noDelay = true) #
Create a cluster instance with the provided logger. Option noDelay controls the settting of the Nagle algorithm on an active connection to a server, which should be disabled by default (noDelay == true)
Cluster join() [final] #
Join the cluster as a client, discovering servers. Client applications should invoke this before making requests so that there are some servers to address.
If cache facilities will be used, then the join(cacheHosts) variation should be used instead
Cluster join(char[][] cacheHosts) [final] #
Join the cluster as a client, discovering servers. Client applications should invoke this before making requests so that there are some servers to address.
If cache facilities will be used, use this method to set the group of valid cache hosts. Each cache host should be described as an array of machine-name and port pairs e.g.
1
["lucy:1234", "daisy:3343", "daisy:3344"]

This sets up a fixed set of cache hosts, which should be identical for all cache clients. Cache hosts not included in this list will be ignored when they come online.

Logger log() [final] #
Return the logger instance provided during construction.
IChannel createChannel(char[] channel) [final] #
Create a channel instance. Our channel implementation includes a number of cached IO helpers (ProtocolWriter and so on) which simplifies and speeds up execution.
void notify(IEvent event) [private] #
ChannelListener method for listening to RollCall responses. These are sent out by cluster servers both when they get a RollCall request, and when they heartbeat.
class Broadcaster [private] #
Basic multicast support across the cluster. Multicast is used for broadcasting messages to all nodes in the cluster. We use it for cache-invalidation, heartbeat, rollcall and notification of queue activity
this() #
Setup a Cluster instance. Currently the buffer & writer are shared for all bulletin serialization; this should probably change at some point such that we can support multiple threads broadcasting concurrently to different output ports.
MulticastConduit conduit() [final] #
Setup the multicast options. Port is used as the sole address port for multicast usage, prefix is prepended to each fabricated multicast address (should be a valid class-D prefix), and ttl is the number of hops
void multicast(int port, int prefix = 225) [final] #
Setup the multicast options. Port is used as the sole address port for multicast usage & prefix is prepended to each fabricated multicast address (should be a valid class-D prefix: 225 through 239 inclusive)
void broadcast(char[] channel, IMessage message = null) [synchronized, final] #
Broadcast a message on the specified channel. This uses IP/Multicast to scatter the payload to all registered listeners (on the same multicast group). Note that the maximum message size is limited to that of an Ethernet data frame, minus the IP/UDP header size (1472 bytes).
Also note that we are synchronized to avoid contention on the otherwise shared output buffer.
InternetAddress getGroup(char[] channel) [synchronized, final] #
Return an internet address representing the multicast group for the specified channel. We use three of the four address segments to represent the channel itself (via a hash on the channel name), and set the primary segment to be that of the broadcast prefix (above).
class Channel : IChannel [private] #
A channel represents something akin to a publish/subscribe topic, or a radio station. These are used to segregate cluster operations into a set of groups, where each group is represented by a channel. Channel names are whatever you want then to be: use of dot notation has proved useful in the past. Channel maintain internal state in order to avoid heap activity. So they should not be shared across threads without appropriate synchs in place. One remedy is create another channel instance
this(Cluster cluster, char[] name) #
Construct a channel with the specified name. We cache a number of session-related constructs here also, in order to eliminate runtime overhead
char[] name() [final] #
Return the name of this channel. This is the name provided when the channel was constructed.
Cluster cluster() [final] #
Return the assigned cluster
Logger log() [final] #
Return the assigned logger
void write(IWriter writer) [final] #
Output this channel via the provided IWriter
void read(IReader reader) [final] #
Input this channel via the provided IReader
IMessage thaw(IMessage host = null) [final] #
deserialize a message into a provided host, or via the registered instance of the incoming message
IConsumer createConsumer(ChannelListener notify) [final] #
Create a listener of the specified type. Listeners are run within their own thread, since they spend the vast majority of their time blocked on a Socket read. Would be good to support multiplexed reading instead, such that a thread pool could be applied instead.
IConsumer createBulletinConsumer(ChannelListener notify) [final] #
Create a listener of the specified type. Listeners are run within their own thread, since they spend the vast majority of their time blocked on a Socket read. Would be good to support multiplexed reading instead, such that a thread pool could be applied instead.
IMessage getCache(char[] key, bool remove, IMessage host = null) [final] #
Return a entry from the network cache, and optionally remove it. This is a synchronous operation as opposed to the asynchronous nature of an invalidate broadcast.
bool putCache(char[] key, IMessage message) [final] #
Place an entry into the network cache, replacing the entry with the identical key. Where message.time is set, it will be used to test for newer cache entries than the one being sent i.e. if someone else placed a newer entry into the cache, that one will remain. Note that this may cause the oldest entry in the cache to be displaced if the cache is already full.
bool loadCache(char[] key, IMessage message) [final] #
Load a network cache entry remotely. This sends the given IMessage over a network to the cache host, where it will be executed locally. The benefit of doing so it that the host may deny access to the cache entry for the duration of the load operation. This, in turn, provides a mechanism for gating/synchronizing multiple network clients over a given cache entry; quite handy for those entries that are relatively expensive to construct or access.
IMessage getQueue(IMessage host = null) [final] #
Query the cluster for queued entries on the corresponding channel. Returns, and removes, the first matching entry from the cluster. Note that this sweeps the cluster for matching entries, and is synchronous in nature. The more common approach is to setup a queue listener, which will grab and dispatch queue entries asynchronously.
bool scanQueue() [private] #
Query the cluster for queued entries on the corresponding channel. Returns, and removes, the first matching entry from the cluster. Note that this sweeps the cluster for matching entries, and is synchronous in nature. The more common approach is to setup a queue listener, which will grab and dispatch queue entries asynchronously.
IMessage putQueue(IMessage message) [final] #
Add an entry to the specified network queue. May throw a QueueFullException if there's no room available.
bool execute(IMessage message) [final] #
Send a remote call request to a server, and place the result back into the provided message
void broadcast(IMessage message = null) [final] #
Broadcast a message on the specified channel. This uses IP/Multicast to scatter the message to all registered listeners (on the same multicast group). Note that the maximum message size is limited to that of an Ethernet data frame, minus the IP/UDP header size (1472 bytes).
class BulletinConsumer : SocketListener, IConsumer, IEvent [private] #
A listener for multicast channel traffic. These are currently used for cache coherency, queue publishing, and node discovery activity; though could be used for direct messaging also.
Be careful when using the retained channel, since it is shared with the calling thread. Thus a race condition could arise between the client and this thread, were both to use the channel for transfers at the same instant. Note that MessageConsumer makes a copy of the channel for this purpose
this(Channel channel, ChannelListener listener) #
Construct a multicast consumer for the specified event. The event handler will be invoked whenever a message arrives for the associated channel.
void notify(IBuffer buffer) [override] #
Notification callback invoked when we receive a multicast packet. Note that we check the packet channel-name against the one we're consuming, to check for cases where the group address had a hash collision.
IMessage get(IMessage host = null) #
Logger log() [final] #
Return the assigned logger
void exception(char [] msg) [override] #
Handle error conditions from the listener thread.
void invoke(IEvent event) [protected] #
Overridable mean of notifying the client code.
Channel channel() [final] #
Return the cluster instance we're associated with.
void pauseGroup() [final] #
Temporarily halt listening. This can be used to ignore multicast messages while, for example, the consumer is busy doing other things.
void resumeGroup() [final] #
Resume listening, post-pause.
void cancel() [final] #
Cancel this consumer. The listener is effectively disabled from this point forward. The listener thread does not halt at this point, but waits until the socket-read returns. Note that the D Interface implementation requires us to "reimplement and dispatch" trivial things like this ~ it's a pain in the neck to maintain.
void reply(IChannel channel, IMessage message) #
Send a message back to the producer
IChannel replyChannel(IMessage message) #
Return an appropriate reply channel for the given message, or return null if no reply is expected
class MessageConsumer : BulletinConsumer [private] #
A listener for queue events. These events are produced by the queue host on a periodic bases when it has available entries. We listen for them (rather than constantly scanning) and then begin a sweep to process as many as we can. Note that we will be in competition with other nodes to process these entries.
Also note that we create a copy of the channel in use, so that race-conditions with the requesting client are avoided.
this(Channel channel, ChannelListener listener) #
Construct a multicast consumer for the specified event
void exception(char [] msg) [override] #
Handle error conditions from the listener thread.
IMessage get(IMessage host = null) [override] #
Overrides the default processing to sweep the cluster for queued entries. Each server node is queried until one is found that contains a message. Note that it is possible to set things up where we are told exactly which node to go to; however given that we won't be listening whilst scanning, and that there's likely to be a group of new entries in the cluster, it's just as effective to scan. This will be far from ideal for all environments, so we should make the strategy pluggable instead.
Note also that the content is retrieved via a duplicate channel to avoid potential race-conditions on the original
void reply(IChannel channel, IMessage message) [override] #
Send a message back to the producer
void invoke(IEvent event) [protected, override] #
Override the default notification handler in order to disable multicast reciepts while the application does what it needs to
class Connection [private] #
An abstraction of a socket connection. Used internally by the socket-based Cluster.
class ConnectionPool [private] #
A pool of socket connections for accessing cluster nodes. Note that the entries will timeout after a period of inactivity, and will subsequently cause a connected host to drop the supporting session.
class PoolConnection : Connection [static] #
Utility class to provide the basic connection facilities provided by the connection pool.
this(ConnectionPool pool) #
Construct a new connection and set its parent
bool reset() [final] #
Create a new socket and connect it to the specified server. This will cause a dedicated thread to start on the server. Said thread will quit when an error occurs.
SocketConduit conduit() [final] #
Return the socket belonging to this connection
void close() [final] #
Close the socket. This will cause any host session to be terminated.
void done(Time time) [final] #
Return this connection to the free-list. Note that we have to synchronize on the parent-pool itself.
this(InternetAddress address, Logger log, bool noDelay) #
Create a connection-pool for the specified address.
Connection borrow(Time time) [synchronized, final] #
Allocate a Connection from a list rather than creating a new one. Reap old entries as we go.
void close() [synchronized, final] #
Close this pool and drop all existing connections.
class Node [private] #
Class to represent a cluster node. Each node supports both cache and queue functionality. Note that the set of available nodes is configured at startup, simplifying the discovery process in some significant ways, and causing less thrashing of cache-keys.
this(Logger log, char[] addr, char[] name) #
Construct a node with the provided name. This name should be the network name of the hosting device.
void setPool(InternetAddress address, bool noDelay) [final] #
Add a cache/queue reference for the remote node
char[] toString() [override] #
Return the name of this node
char[] address() [final] #
Return the network address of this node
void fail() [final] #
Remove this Node from the cluster. The node is disabled until it is seen to recover.
bool isEnabled() [final] #
Get the current state of this node
void setEnabled(bool enabled) [final] #
Set the enabled state of this node
bool request(Requestor dg, ProtocolReader reader, out bool message) [final] #
request data; fail this Node if we can't connect. Note that we make several attempts to connect before writing the node off as a failure. We use a delegate to perform the request output since it may be invoked on more than one iteration, where the current attempt fails.
We return true if the cluster node responds, and false otherwise. Exceptions are thrown if they occured on the server. Parameter 'message' is set true if a message is available from the server response
class NodeSet [private] #
Models a generic set of cluster nodes. This is intended to be thread-safe, with no locking on a lookup operation
this(Logger log, bool noDelay) #
Logger logger() [final] #
Node addNode(Node node) [synchronized, final] #
Add a node to the list of servers
Node selectNode(uint index) [final] #
Select a cluster server based on a starting index. If the selected server is not currently enabled, we just try the next one. This behaviour should be consistent across each cluster client.
class Set [private, static] #
Host class for the set of nodes. We utilize this to enable atomic read/write where it would not be otherwise possible -- D arrays are organized as ptr+length pairs and are thus inherently non-atomic for assignment purposes
class FixedNodeSet : NodeSet [private] #
Models a fixed set of cluster nodes. Used for Cache
this(Logger log, bool noDelay) #
void enable(char[] addr) [synchronized, final] #
bool request(Node.Requestor dg, ProtocolReader reader, char[] key) [final] #
Select a cluster server based on the specified key. If the selected server is not currently enabled, we just try the next one. This behaviour should be consistent across each cluster client.
class FlexNodeSet : NodeSet [private] #
Models a flexible set of cluster nodes. Used for queue and task
this(Logger log, bool noDelay) #
void enable(char[] addr, char[] name) [synchronized, final] #
bool request(Node.Requestor dg, ProtocolReader reader) [final] #
Select a cluster server based on the specified key. If the selected server is not currently enabled, we just try the next one. This behaviour should be consistent across each cluster client.
bool scan(bool delegate(Node) dg) [final] #
Sweep the cluster servers. Returns true if the delegate returns true, false otherwise. The sweep is halted when the delegate returns true. Note that this scans nodes in a randomized pattern, which should tend to avoid 'bursty' activity by a set of clients upon any one cluster server.
uint jhash(void* k, uint len, uint init = 0) [private, static] #
The Bob Jenkins lookup2 algorithm. This should be relocated to somewhere common