Package io.mats3.matssocket
Interface ClusterStoreAndForward
public interface ClusterStoreAndForward
MatsSockets forwards requests from WebSocket-connected clients to a Mats Endpoint, and must get the reply back to the
client. The WebSocket is "static" in that it is a fixed connection to one specific node (as long as the connection is
up), which means that we need to get the message back to the same node that fired off the request. We could have used
the same logic as with MatsFuturizer (i.e. the "last jump" uses a node-specific Topic), but that was deemed too
feeble: We want reliable messaging all the way to the client. We want to be able to handle that the client looses the
connection (e.g. sitting on a train which drives through a tunnel), and thus will reconnect. He might not come back
to the same server as last time. We also want to be able to reboot any server (typically deploy of new code) at any
time, but this obviously kills all WebSockets that are attached to it. To be able to ensure reliable messaging for
MatsSockets, we need to employ a "store and forward" logic: When the reply comes in, which as with standard Mats
logic can happen on any node in the cluster of nodes handling this Endpoint Stage, the reply message is temporarily
stored in some reliable storage. We then look up which node that currently holds the MatsSession, and notify it about
new messages for that session. This node gets the notice, finds the now local MatsSession, and forwards the message.
Note that it is possible that the node getting the reply, and the node which holds the WebSocket/MatsSession, is the
same node, in which case it eventually results in a local forward.
Each node has his own instance of this class, connected to the same backing datastore.
It is assumed that the consumption of messages for a session is done single threaded, on one node only. That is, only
one thread on one node will actually
getMessagesFromOutbox(String, int)
get messages}, and, more
importantly, register them as completed
. Wrt. multiple nodes,
this argument still holds, since only one node can hold a MatsSocketSession - the one that has the actual WebSocket
connection. I believe it is possible to construct a bad async situation here (connect to one node, authenticate, get
SessionId, immediately disconnect and perform reconnect, and do this until the current ClusterStoreAndForward
has the wrong idea of which node holds the Session) but this should at most result in the client screwing up for
himself (not getting messages). A Session is not registered until the client has authenticated, so this will never
lead to information leakage to other users. Such a situation will also resolve if the client again performs a
non-malicious reconnect. It is the server that constructs and holds SessionIds: A client cannot itself force the
server side to create a Session or SessionId - it can only reconnect to an existing SessionId that it was given
earlier.-
Nested Class Summary
Modifier and TypeInterfaceDescriptionstatic interface
static class
If having problems accessing the underlying common data store.static class
Thrown if the operation resulted in a Unique Constraint situation.static interface
static class
static class
static class
static class
static interface
static interface
static class
Thrown fromregisterSessionAtThisNode(String, String, String, String, String, String)
if the userId does not match the original userId that created this session. -
Method Summary
Modifier and TypeMethodDescriptionvoid
boot()
Start theClusterStoreAndForward
, perform DB preparations and migrations.void
closeSession
(String matsSocketSessionId) Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION.void
deleteMessageIdsFromInbox
(String matsSocketSessionId, Collection<String> clientMessageIds) Deletes the incoming message Ids, as we've established that the client will never try to send this particular message again.void
deregisterSessionFromThisNode
(String matsSocketSessionId, String connectionId) Deregisters a Session home when a WebSocket is closed.getAndDeleteRequestCorrelation
(String matsSocketSessionId, String serverMessageId) getCurrentRegisteredNodeForSession
(String matsSocketSessionId) getMessageFromInbox
(String matsSocketSessionId, String clientMessageId) getMessagesFromOutbox
(String matsSocketSessionId, int maxNumberOfMessages) Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted delivered already (marked withoutboxMessagesAttemptedDelivery(String, Collection)
).getSessions
(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) Direct implementation ofMatsSocketServer.getMatsSocketSessions(boolean, String, String, String)
- go read there for semantics.int
getSessionsCount
(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) Direct implementation ofMatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)
- go read there for semantics.boolean
isSessionExists
(String matsSocketSessionId) void
notifySessionLiveliness
(Collection<String> matsSocketSessionIds) Shall be invoked on some kind of schedule (e.g.void
outboxMessagesAttemptedDelivery
(String matsSocketSessionId, Collection<String> serverMessageIds) Marks the specified messages as attempted delivered and notches theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up.void
outboxMessagesComplete
(String matsSocketSessionId, Collection<String> serverMessageIds) States that the messages are delivered.void
outboxMessagesDeadLetterQueue
(String matsSocketSessionId, Collection<String> serverMessageIds) States that the messages overran the accepted number of delivery attempts.void
outboxMessagesUnmarkAttemptedDelivery
(String matsSocketSessionId) When this method is invoked,getMessagesFromOutbox(String, int)
will again return messages that has previously been marked as attempted delivered withoutboxMessagesAttemptedDelivery(String, Collection)
.long
registerSessionAtThisNode
(String matsSocketSessionId, String userId, String connectionId, String clientLibAndVersions, String appName, String appVersion) Registers a Session home to this node - only one node can ever be home, so any old is deleted.int
Shall be invoked on some kind of schedule (e.g.void
storeMessageIdInInbox
(String matsSocketSessionId, String clientMessageId) Stores the incoming message Id, to avoid double delivery.storeMessageInOutbox
(String matsSocketSessionId, String serverMessageId, String clientMessageId, String traceId, MatsSocketServer.MessageType type, Long requestTimestamp, String envelope, String messageJson, byte[] messageBinary) Stores the message for the Session, returning the nodename for the node holding the session, if any.void
storeRequestCorrelation
(String matsSocketSessionId, String serverMessageId, long requestTimestamp, String replyTerminatorId, String correlationString, byte[] correlationBinary) timeoutSessions
(long notLiveSinceTimestamp) Shall be invoked on some kind of schedule (e.g.void
updateMessageInInbox
(String matsSocketSessionId, String clientMessageId, String messageJson, byte[] messageBinary) Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based onstoreMessageIdInInbox(String, String)
throwingClusterStoreAndForward.MessageIdAlreadyExistsException
), the result from the previous processing can be returned right away.
-
Method Details
-
boot
void boot()Start theClusterStoreAndForward
, perform DB preparations and migrations. -
registerSessionAtThisNode
long registerSessionAtThisNode(String matsSocketSessionId, String userId, String connectionId, String clientLibAndVersions, String appName, String appVersion) throws ClusterStoreAndForward.WrongUserException, ClusterStoreAndForward.DataAccessException Registers a Session home to this node - only one node can ever be home, so any old is deleted. When "re-registering" a session, it is asserted that the provided 'userId' is the same UserId as originally registered - aClusterStoreAndForward.WrongUserException
is thrown if this does not match.- Parameters:
matsSocketSessionId
- the SessionId for this connection.userId
- the UserId, as provided byAuthenticationPlugin
, that own this MatsSocketSessionIdconnectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes, a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed and then invokingderegisterSessionFromThisNode(String, String)
clientLibAndVersions
- the Client Library and Versions + runtime information for the Client.appName
- the AppName of the accessing Client appappVersion
- the AppVersion of the accessing Client app- Returns:
- the created timestamp - which is either "now" if this is the first register, or if it a "reconnect", when this MatsSocketSession was initially created
- Throws:
ClusterStoreAndForward.WrongUserException
- if the userId provided does not match the original userId that created the session.ClusterStoreAndForward.DataAccessException
- if problems with underlying data store.
-
getCurrentRegisteredNodeForSession
Optional<ClusterStoreAndForward.CurrentNode> getCurrentRegisteredNodeForSession(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException - Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to find current session home.- Returns:
- the current node holding MatsSocket Session, or empty if none.
- Throws:
ClusterStoreAndForward.DataAccessException
-
isSessionExists
boolean isSessionExists(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException - Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to check whether there is a session- Returns:
- whether a CSAF Session exists - NOTE: Not whether it is currently registered!
- Throws:
ClusterStoreAndForward.DataAccessException
-
getSessions
List<MatsSocketServer.MatsSocketSessionDto> getSessions(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException Direct implementation ofMatsSocketServer.getMatsSocketSessions(boolean, String, String, String)
- go read there for semantics.- Returns:
- the list of all MatsSocketSessions currently registered with this MatsSocketServer instance matching the
constraints if set - as read from the
data store
. - Throws:
ClusterStoreAndForward.DataAccessException
-
getSessionsCount
int getSessionsCount(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException Direct implementation ofMatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)
- go read there for semantics.- Returns:
- the count of all MatsSocketSessions currently registered with this MatsSocketServer instance matching the
constraints if set - as read from the
data store
. - Throws:
ClusterStoreAndForward.DataAccessException
-
deregisterSessionFromThisNode
void deregisterSessionFromThisNode(String matsSocketSessionId, String connectionId) throws ClusterStoreAndForward.DataAccessException Deregisters a Session home when a WebSocket is closed. This node's nodename and this WebSocket Session's ConnectionId is taken into account, so that if it has changed async, it will not deregister a new session home (which can potentially be on the same node the 'connectionId' parameter).- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to deregister the specific WebSocket Session's ConnectionId.connectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes, a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed and then invokingderegisterSessionFromThisNode(String, String)
- Throws:
ClusterStoreAndForward.DataAccessException
-
closeSession
Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION. This deletes the session instance, and any messages in queue for it. No new incoming REPLYs, SENDs or RECEIVEs for this SessionId will be sent anywhere. (Notice that the implementation might still store any new outboxed messages, not checking that the session actually exists. But this SessionId will never be reused (exception without extreme "luck"), and the implementation will periodically purge such non-session-attached outbox messages).- Parameters:
matsSocketSessionId
- the MatsSocketSessionId that should be closed.- Throws:
ClusterStoreAndForward.DataAccessException
-
notifySessionLiveliness
void notifySessionLiveliness(Collection<String> matsSocketSessionIds) throws ClusterStoreAndForward.DataAccessException Shall be invoked on some kind of schedule (e.g. every 1 minute) by every node to informClusterStoreAndForward
about which Sessions are currently live on that node. Sessions that aren't live, will be scavenged after some time by invocation oftimeoutSessions(long)
.- Parameters:
matsSocketSessionIds
- which sessions are currently live on the invoking node.- Throws:
ClusterStoreAndForward.DataAccessException
-
timeoutSessions
Collection<String> timeoutSessions(long notLiveSinceTimestamp) throws ClusterStoreAndForward.DataAccessException Shall be invoked on some kind of schedule (e.g. every 10 minutes on average). Times out any session which have not beennotified about liveliness
since the supplied timestamp (millis from epoch).- Parameters:
notLiveSinceTimestamp
- decides which sessions are too old: Sessions which have not beennotified about liveliness
since this timestamp will be deleted.- Returns:
- which sessions was timed out.
- Throws:
ClusterStoreAndForward.DataAccessException
-
scavengeSessionRemnants
Shall be invoked on some kind of schedule (e.g. every hour on average). Scavenges all inboxes, outboxes and "request boxes" for any lingering data for closed and timed out sessions.- Returns:
- how many items was scavenged, for logging. May return constant zero if not readily available.
- Throws:
ClusterStoreAndForward.DataAccessException
-
storeMessageIdInInbox
void storeMessageIdInInbox(String matsSocketSessionId, String clientMessageId) throws ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.DataAccessException Stores the incoming message Id, to avoid double delivery. If the Client MessageId (cmid) already exists, aClusterStoreAndForward.MessageIdAlreadyExistsException
will be raised - this implies that a double delivery has occurred.- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to store the incoming message id.clientMessageId
- the client's message Id for the incoming message.- Throws:
ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Client MessageId (cmid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
-
updateMessageInInbox
void updateMessageInInbox(String matsSocketSessionId, String clientMessageId, String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based onstoreMessageIdInInbox(String, String)
throwingClusterStoreAndForward.MessageIdAlreadyExistsException
), the result from the previous processing can be returned right away. -
getMessageFromInbox
ClusterStoreAndForward.StoredInMessage getMessageFromInbox(String matsSocketSessionId, String clientMessageId) throws ClusterStoreAndForward.DataAccessException -
deleteMessageIdsFromInbox
void deleteMessageIdsFromInbox(String matsSocketSessionId, Collection<String> clientMessageIds) throws ClusterStoreAndForward.DataAccessException Deletes the incoming message Ids, as we've established that the client will never try to send this particular message again.- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to delete the incoming message id.clientMessageIds
- the client's message Ids for the incoming messages to delete.- Throws:
ClusterStoreAndForward.DataAccessException
-
storeMessageInOutbox
Optional<ClusterStoreAndForward.CurrentNode> storeMessageInOutbox(String matsSocketSessionId, String serverMessageId, String clientMessageId, String traceId, MatsSocketServer.MessageType type, Long requestTimestamp, String envelope, String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException Stores the message for the Session, returning the nodename for the node holding the session, if any. If the session is closed/timed out, the message won't be stored (i.e. dumped on the floor) and the return value is empty. The Server MessageId (smid) is set by the caller - and since this might have a collision, the method throwsClusterStoreAndForward.MessageIdAlreadyExistsException
if unique constraint fails. In this case, a new Server MessageId should be picked and try again.- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the message is meant for.serverMessageId
- unique id for outbox'ed message within the MatsSocketSessionId. Must be long enough that it is extremely unlikely that is collides with another message within the same MatsSocketSessionId - but do remember that there will at any one time be pretty few messages in the outbox for a given MatsSocketSession.clientMessageId
- For Client Replies to requests from Server-to-Client, this is the Client's Message Id, which we need to send back with the Reply so that the Client can correlate the Request. (Nullable, since not all messages are replies).traceId
- the server-side traceId for this message.type
- the type of the outgoing message (RESOLVE, REJECT or PUB).requestTimestamp
- For Client Replies to requests from Server-to-Client, this is the timestamp we received the Request.envelope
- the JSON-serialized MatsSocket Envelope without the 'msg' field set.messageJson
- the JSON-serialized message - the piece that sits in the 'msg' field of the Envelope. (Nullable)messageBinary
- the binary part of an outgoing message. (Nullable)- Returns:
- the current node holding MatsSocket Session, or empty if none.
- Throws:
ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Server MessageId (smid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
-
getMessagesFromOutbox
List<ClusterStoreAndForward.StoredOutMessage> getMessagesFromOutbox(String matsSocketSessionId, int maxNumberOfMessages) throws ClusterStoreAndForward.DataAccessException Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted delivered already (marked withoutboxMessagesAttemptedDelivery(String, Collection)
). IfoutboxMessagesUnmarkAttemptedDelivery(String)
is invoked, that mark will be unset.- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the message is meant for.maxNumberOfMessages
- the maximum number of messages to fetch.- Returns:
- a list of json encoded messages destined for the WebSocket.
- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesAttemptedDelivery
void outboxMessagesAttemptedDelivery(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException Marks the specified messages as attempted delivered and notches theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. IfoutboxMessagesUnmarkAttemptedDelivery(String)
is invoked (typically on reconnect), the mark will be unset, but the delivery count will stay in place - this is to be able to abort delivery attempts if there is something wrong with the message.- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages failed delivery.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesUnmarkAttemptedDelivery
void outboxMessagesUnmarkAttemptedDelivery(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException When this method is invoked,getMessagesFromOutbox(String, int)
will again return messages that has previously been marked as attempted delivered withoutboxMessagesAttemptedDelivery(String, Collection)
. Notice that theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
will not be reset, but theClusterStoreAndForward.StoredOutMessage.getAttemptTimestamp()
will now again return null.- Parameters:
matsSocketSessionId
- the matsSocketSessionId whose messages now shall be attempted.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesComplete
void outboxMessagesComplete(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException States that the messages are delivered. Will typically delete the message.- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages are complete.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesDeadLetterQueue
void outboxMessagesDeadLetterQueue(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException States that the messages overran the accepted number of delivery attempts.- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages should be DLQed.- Throws:
ClusterStoreAndForward.DataAccessException
-
storeRequestCorrelation
void storeRequestCorrelation(String matsSocketSessionId, String serverMessageId, long requestTimestamp, String replyTerminatorId, String correlationString, byte[] correlationBinary) throws ClusterStoreAndForward.DataAccessException -
getAndDeleteRequestCorrelation
Optional<ClusterStoreAndForward.RequestCorrelation> getAndDeleteRequestCorrelation(String matsSocketSessionId, String serverMessageId) throws ClusterStoreAndForward.DataAccessException
-