Apache Tomcat 6.0.43

org.apache.catalina.tribes.group.interceptors
Class NonBlockingCoordinator

java.lang.Object
  extended by org.apache.catalina.tribes.group.ChannelInterceptorBase
      extended by org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator
All Implemented Interfaces:
ChannelInterceptor, Heartbeat, MembershipListener

public class NonBlockingCoordinator
extends ChannelInterceptorBase

Title: Auto merging leader election algorithm

Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, it also merges groups automatically when members are discovered that werent part of the

This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on

This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership to pass a token ring of the current membership.
This is not the same as just using AbsoluteOrder! Consider the following scenario:
Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all nodes are receiving pings from all the other nodes. meaning, that node{i} receives pings from node{all}-node{i}
but the following could happen if a multicast problem occurs. A has members {B,C,D}
B has members {A,C}
C has members {D,E}
D has members {A,B,C,E}
E has members {A,C,D}
Because the default Tribes membership implementation, relies on the multicast packets to arrive at all nodes correctly, there is nothing guaranteeing that it will.

To best explain how this algorithm works, lets take the above example: For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work where messages overlap, as they all depend on absolute order
Scenario 1: A,B,C,D,E all come online at the same time Eval phase, A thinks of itself as leader, B thinks of A as leader, C thinks of itself as leader, D,E think of A as leader
Token phase:
(1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
(1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
(2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
(2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
(3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
(3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
(4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
(4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
(5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
At this point, the state looks like
A - {A-ldr, mbrs-A,B,C,D,E, id=X}
B - {A-ldr, mbrs-A,B,C,D, id=X}
C - {A-ldr, mbrs-A,B,C,D,E, id=X}
D - {A-ldr, mbrs-A,B,C,D,E, id=X}
E - {A-ldr, mbrs-A,B,C,D,E, id=Y}

A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have arrived at the same membership and all nodes are informed of each other.
To synchronize the rest we simply perform the following check at A when A receives X:
Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B When A receives X again, the token is complete.
Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then install and accept the view.

Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
Lets also assume that C1 sees the following view {B,D,E}
C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.
In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.
In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D
D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E
E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A
A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E

To ensure that the view gets implemented at all nodes at the same time, A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.

Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships

The example above, of course can be simplified with a finite statemachine:
But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
Maybe I'll do a state diagram :)

State Diagrams

Initiate an election

Receive an election message

Version:
1.0
Author:
Filip Hanik

Nested Class Summary
static class NonBlockingCoordinator.CoordinationEvent
           
static class NonBlockingCoordinator.CoordinationMessage
           
 
Nested classes/interfaces inherited from interface org.apache.catalina.tribes.ChannelInterceptor
ChannelInterceptor.InterceptorEvent
 
Field Summary
protected static byte[] COORD_ALIVE
          Alive message
protected static byte[] COORD_CONF
          Coordination confirmation, for blocking installations
protected static byte[] COORD_HEADER
          header for a coordination message
protected static byte[] COORD_REQUEST
          Coordination request
protected  java.util.concurrent.atomic.AtomicBoolean coordMsgReceived
           
protected  java.lang.Object electionMutex
           
protected  Membership membership
          Our nonblocking membership
protected  boolean started
           
protected  int startsvc
           
protected  Membership suggestedView
           
protected  UniqueId suggestedviewId
          indicates that we are running an election and this is the one we are running
protected  Membership view
          Our current view
protected  UniqueId viewId
          Out current viewId
protected  long waitForCoordMsgTimeout
          Time to wait for coordination timeout
 
Fields inherited from class org.apache.catalina.tribes.group.ChannelInterceptorBase
log, optionFlag
 
Constructor Summary
NonBlockingCoordinator()
           
 
Method Summary
 boolean accept(ChannelMessage msg)
           
protected  boolean alive(Member mbr)
           
 ChannelData createData(NonBlockingCoordinator.CoordinationMessage msg, MemberImpl local)
           
 void fireInterceptorEvent(ChannelInterceptor.InterceptorEvent event)
           
 Member getCoordinator()
          Returns coordinator if one is available
 Member getLocalMember(boolean incAlive)
          Return the member that represents this node.
 Member getMember(Member mbr)
          Intercepts the code>Channel.getMember(Member) method
 Member[] getMembers()
          Get all current cluster members
 Member getNextInLine(MemberImpl local, MemberImpl[] others)
           
 Member[] getView()
           
 UniqueId getViewId()
           
protected  void halt()
          Block in/out messages while a election is going on
protected  void handleMyToken(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
           
protected  void handleOtherToken(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
           
protected  void handleToken(NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
           
protected  void handleViewConf(NonBlockingCoordinator.CoordinationMessage msg, Member sender, Membership merged)
           
protected  boolean hasHigherPriority(Member[] complete, Member[] local)
           
 boolean hasMembers()
          has members
 void heartbeat()
          The heartbeat() method gets invoked periodically to allow interceptors to clean up resources, time out object and perform actions that are unrelated to sending/receiving data.
 boolean isCoordinator()
           
 boolean isHighest()
           
protected  boolean isViewConf(NonBlockingCoordinator.CoordinationMessage msg)
           
 void memberAdded(Member member)
          A member was added to the group
 void memberAdded(Member member, boolean elect)
           
 void memberDisappeared(Member member)
          A member was removed from the group
If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD data
protected  Membership mergeOnArrive(NonBlockingCoordinator.CoordinationMessage msg, Member sender)
           
 void messageReceived(ChannelMessage msg)
          the messageReceived is invoked when a message is received.
protected  void processCoordMessage(NonBlockingCoordinator.CoordinationMessage msg, Member sender)
           
protected  void release()
          Release lock for in/out messages election is completed
protected  void sendElectionMsg(MemberImpl local, MemberImpl next, NonBlockingCoordinator.CoordinationMessage msg)
           
protected  void sendElectionMsgToNextInline(MemberImpl local, NonBlockingCoordinator.CoordinationMessage msg)
           
 void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
          The sendMessage method is called when a message is being sent to one more destinations.
protected  void setupMembership()
           
 void start(int svc)
          Starts up the channel.
 void startElection(boolean force)
           
 void stop(int svc)
          Shuts down the channel.
protected  void viewChange(UniqueId viewId, Member[] view)
           
protected  void waitForRelease()
          Wait for an election to end
 
Methods inherited from class org.apache.catalina.tribes.group.ChannelInterceptorBase
getNext, getOptionFlag, getPrevious, okToProcess, setNext, setOptionFlag, setPrevious
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

COORD_HEADER

protected static final byte[] COORD_HEADER
header for a coordination message


COORD_REQUEST

protected static final byte[] COORD_REQUEST
Coordination request


COORD_CONF

protected static final byte[] COORD_CONF
Coordination confirmation, for blocking installations


COORD_ALIVE

protected static final byte[] COORD_ALIVE
Alive message


waitForCoordMsgTimeout

protected long waitForCoordMsgTimeout
Time to wait for coordination timeout


view

protected Membership view
Our current view


viewId

protected UniqueId viewId
Out current viewId


membership

protected Membership membership
Our nonblocking membership


suggestedviewId

protected UniqueId suggestedviewId
indicates that we are running an election and this is the one we are running


suggestedView

protected Membership suggestedView

started

protected boolean started

startsvc

protected final int startsvc
See Also:
Constant Field Values

electionMutex

protected java.lang.Object electionMutex

coordMsgReceived

protected java.util.concurrent.atomic.AtomicBoolean coordMsgReceived
Constructor Detail

NonBlockingCoordinator

public NonBlockingCoordinator()
Method Detail

startElection

public void startElection(boolean force)
                   throws ChannelException
Throws:
ChannelException

sendElectionMsg

protected void sendElectionMsg(MemberImpl local,
                               MemberImpl next,
                               NonBlockingCoordinator.CoordinationMessage msg)
                        throws ChannelException
Throws:
ChannelException

sendElectionMsgToNextInline

protected void sendElectionMsgToNextInline(MemberImpl local,
                                           NonBlockingCoordinator.CoordinationMessage msg)
                                    throws ChannelException
Throws:
ChannelException

getNextInLine

public Member getNextInLine(MemberImpl local,
                            MemberImpl[] others)

createData

public ChannelData createData(NonBlockingCoordinator.CoordinationMessage msg,
                              MemberImpl local)

viewChange

protected void viewChange(UniqueId viewId,
                          Member[] view)

alive

protected boolean alive(Member mbr)

mergeOnArrive

protected Membership mergeOnArrive(NonBlockingCoordinator.CoordinationMessage msg,
                                   Member sender)

processCoordMessage

protected void processCoordMessage(NonBlockingCoordinator.CoordinationMessage msg,
                                   Member sender)
                            throws ChannelException
Throws:
ChannelException

handleToken

protected void handleToken(NonBlockingCoordinator.CoordinationMessage msg,
                           Member sender,
                           Membership merged)
                    throws ChannelException
Throws:
ChannelException

handleMyToken

protected void handleMyToken(MemberImpl local,
                             NonBlockingCoordinator.CoordinationMessage msg,
                             Member sender,
                             Membership merged)
                      throws ChannelException
Throws:
ChannelException

handleOtherToken

protected void handleOtherToken(MemberImpl local,
                                NonBlockingCoordinator.CoordinationMessage msg,
                                Member sender,
                                Membership merged)
                         throws ChannelException
Throws:
ChannelException

handleViewConf

protected void handleViewConf(NonBlockingCoordinator.CoordinationMessage msg,
                              Member sender,
                              Membership merged)
                       throws ChannelException
Throws:
ChannelException

isViewConf

protected boolean isViewConf(NonBlockingCoordinator.CoordinationMessage msg)

hasHigherPriority

protected boolean hasHigherPriority(Member[] complete,
                                    Member[] local)

getCoordinator

public Member getCoordinator()
Returns coordinator if one is available

Returns:
Member

getView

public Member[] getView()

getViewId

public UniqueId getViewId()

halt

protected void halt()
Block in/out messages while a election is going on


release

protected void release()
Release lock for in/out messages election is completed


waitForRelease

protected void waitForRelease()
Wait for an election to end


start

public void start(int svc)
           throws ChannelException
Description copied from class: ChannelInterceptorBase
Starts up the channel. This can be called multiple times for individual services to start The svc parameter can be the logical or value of any constants

Specified by:
start in interface ChannelInterceptor
Overrides:
start in class ChannelInterceptorBase
Parameters:
svc - int value of
DEFAULT - will start all services
MBR_RX_SEQ - starts the membership receiver
MBR_TX_SEQ - starts the membership broadcaster
SND_TX_SEQ - starts the replication transmitter
SND_RX_SEQ - starts the replication receiver
Throws:
ChannelException - if a startup error occurs or the service is already started.
See Also:
Channel

stop

public void stop(int svc)
          throws ChannelException
Description copied from class: ChannelInterceptorBase
Shuts down the channel. This can be called multiple times for individual services to shutdown The svc parameter can be the logical or value of any constants

Specified by:
stop in interface ChannelInterceptor
Overrides:
stop in class ChannelInterceptorBase
Parameters:
svc - int value of
DEFAULT - will shutdown all services
MBR_RX_SEQ - stops the membership receiver
MBR_TX_SEQ - stops the membership broadcaster
SND_TX_SEQ - stops the replication transmitter
SND_RX_SEQ - stops the replication receiver
Throws:
ChannelException - if a startup error occurs or the service is already started.
See Also:
Channel

sendMessage

public void sendMessage(Member[] destination,
                        ChannelMessage msg,
                        InterceptorPayload payload)
                 throws ChannelException
Description copied from interface: ChannelInterceptor
The sendMessage method is called when a message is being sent to one more destinations. The interceptor can modify any of the parameters and then pass on the message down the stack by invoking getNext().sendMessage(destination,msg,payload)
Alternatively the interceptor can stop the message from being sent by not invoking getNext().sendMessage(destination,msg,payload)
If the message is to be sent asynchronous the application can be notified of completion and errors by passing in an error handler attached to a payload object.
The ChannelMessage.getAddress contains Channel.getLocalMember, and can be overwritten to simulate a message sent from another node.

Specified by:
sendMessage in interface ChannelInterceptor
Overrides:
sendMessage in class ChannelInterceptorBase
Parameters:
destination - Member[] - the destination for this message
msg - ChannelMessage - the message to be sent
payload - InterceptorPayload - the payload, carrying an error handler and future useful data, can be null
Throws:
ChannelException
See Also:
ErrorHandler, InterceptorPayload

messageReceived

public void messageReceived(ChannelMessage msg)
Description copied from interface: ChannelInterceptor
the messageReceived is invoked when a message is received. ChannelMessage.getAddress() is the sender, or the reply-to address if it has been overwritten.

Specified by:
messageReceived in interface ChannelInterceptor
Overrides:
messageReceived in class ChannelInterceptorBase
Parameters:
msg - ChannelMessage

accept

public boolean accept(ChannelMessage msg)
Overrides:
accept in class ChannelInterceptorBase

memberAdded

public void memberAdded(Member member)
Description copied from interface: MembershipListener
A member was added to the group

Specified by:
memberAdded in interface MembershipListener
Overrides:
memberAdded in class ChannelInterceptorBase
Parameters:
member - Member - the member that was added

memberAdded

public void memberAdded(Member member,
                        boolean elect)

memberDisappeared

public void memberDisappeared(Member member)
Description copied from interface: MembershipListener
A member was removed from the group
If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD data

Specified by:
memberDisappeared in interface MembershipListener
Overrides:
memberDisappeared in class ChannelInterceptorBase
Parameters:
member - Member
See Also:
Member.SHUTDOWN_PAYLOAD

isHighest

public boolean isHighest()

isCoordinator

public boolean isCoordinator()

heartbeat

public void heartbeat()
Description copied from interface: ChannelInterceptor
The heartbeat() method gets invoked periodically to allow interceptors to clean up resources, time out object and perform actions that are unrelated to sending/receiving data.

Specified by:
heartbeat in interface ChannelInterceptor
Specified by:
heartbeat in interface Heartbeat
Overrides:
heartbeat in class ChannelInterceptorBase

hasMembers

public boolean hasMembers()
has members

Specified by:
hasMembers in interface ChannelInterceptor
Overrides:
hasMembers in class ChannelInterceptorBase
Returns:
boolean - if the channel has members in its membership group
See Also:
Channel.hasMembers()

getMembers

public Member[] getMembers()
Get all current cluster members

Specified by:
getMembers in interface ChannelInterceptor
Overrides:
getMembers in class ChannelInterceptorBase
Returns:
all members or empty array
See Also:
Channel.getMembers()

getMember

public Member getMember(Member mbr)
Description copied from interface: ChannelInterceptor
Intercepts the code>Channel.getMember(Member) method

Specified by:
getMember in interface ChannelInterceptor
Overrides:
getMember in class ChannelInterceptorBase
Parameters:
mbr - Member
Returns:
Member
See Also:
Channel.getMember(Member)

getLocalMember

public Member getLocalMember(boolean incAlive)
Return the member that represents this node.

Specified by:
getLocalMember in interface ChannelInterceptor
Overrides:
getLocalMember in class ChannelInterceptorBase
Parameters:
incAlive - boolean
Returns:
Member
See Also:
Channel.getLocalMember(boolean)

setupMembership

protected void setupMembership()

fireInterceptorEvent

public void fireInterceptorEvent(ChannelInterceptor.InterceptorEvent event)
Specified by:
fireInterceptorEvent in interface ChannelInterceptor
Overrides:
fireInterceptorEvent in class ChannelInterceptorBase

Apache Tomcat 6.0.43

Copyright © 2000-2014 Apache Software Foundation. All Rights Reserved.