public interface ClusterStoreAndForward
getMessagesFromOutbox(String, int)
get messages}, and, more
importantly, register them as completed
. Wrt. multiple nodes,
this argument still holds, since only one node can hold a MatsSocketSession - the one that has the actual WebSocket
connection. I believe it is possible to construct a bad async situation here (connect to one node, authenticate, get
SessionId, immediately disconnect and perform reconnect, and do this until the current ClusterStoreAndForward
has the wrong idea of which node holds the Session) but this should at most result in the client screwing up for
himself (not getting messages). A Session is not registered until the client has authenticated, so this will never
lead to information leakage to other users. Such a situation will also resolve if the client again performs a
non-malicious reconnect. It is the server that constructs and holds SessionIds: A client cannot itself force the
server side to create a Session or SessionId - it can only reconnect to an existing SessionId that it was given
earlier.Modifier and Type | Interface and Description |
---|---|
static interface |
ClusterStoreAndForward.CurrentNode |
static class |
ClusterStoreAndForward.DataAccessException
If having problems accessing the underlying common data store.
|
static class |
ClusterStoreAndForward.MessageIdAlreadyExistsException
Thrown if the operation resulted in a Unique Constraint situation.
|
static interface |
ClusterStoreAndForward.RequestCorrelation |
static class |
ClusterStoreAndForward.SimpleCurrentNode |
static class |
ClusterStoreAndForward.SimpleRequestCorrelation |
static class |
ClusterStoreAndForward.SimpleStoredInMessage |
static class |
ClusterStoreAndForward.SimpleStoredOutMessage |
static interface |
ClusterStoreAndForward.StoredInMessage |
static interface |
ClusterStoreAndForward.StoredOutMessage |
static class |
ClusterStoreAndForward.WrongUserException
Thrown from
registerSessionAtThisNode(String, String, String, String, String, String) if the userId does
not match the original userId that created this session. |
Modifier and Type | Method and Description |
---|---|
void |
boot()
Start the
ClusterStoreAndForward , perform DB preparations and migrations. |
void |
closeSession(java.lang.String matsSocketSessionId)
Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION.
|
void |
deleteMessageIdsFromInbox(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> clientMessageIds)
Deletes the incoming message Ids, as we've established that the client will never try to send this particular
message again.
|
void |
deregisterSessionFromThisNode(java.lang.String matsSocketSessionId,
java.lang.String connectionId)
Deregisters a Session home when a WebSocket is closed.
|
java.util.Optional<ClusterStoreAndForward.RequestCorrelation> |
getAndDeleteRequestCorrelation(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId) |
java.util.Optional<ClusterStoreAndForward.CurrentNode> |
getCurrentRegisteredNodeForSession(java.lang.String matsSocketSessionId) |
ClusterStoreAndForward.StoredInMessage |
getMessageFromInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId) |
java.util.List<ClusterStoreAndForward.StoredOutMessage> |
getMessagesFromOutbox(java.lang.String matsSocketSessionId,
int maxNumberOfMessages)
Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted
delivered already (marked with
outboxMessagesAttemptedDelivery(String, Collection) ). |
java.util.List<MatsSocketServer.MatsSocketSessionDto> |
getSessions(boolean onlyActive,
java.lang.String userId,
java.lang.String appName,
java.lang.String appVersionAtOrAbove)
Direct implementation of
MatsSocketServer.getMatsSocketSessions(boolean, String, String, String) - go
read there for semantics. |
int |
getSessionsCount(boolean onlyActive,
java.lang.String userId,
java.lang.String appName,
java.lang.String appVersionAtOrAbove)
Direct implementation of
MatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String) -
go read there for semantics. |
boolean |
isSessionExists(java.lang.String matsSocketSessionId) |
void |
notifySessionLiveliness(java.util.Collection<java.lang.String> matsSocketSessionIds)
Shall be invoked on some kind of schedule (e.g.
|
void |
outboxMessagesAttemptedDelivery(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
Marks the specified messages as attempted delivered and notches the
ClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. |
void |
outboxMessagesComplete(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
States that the messages are delivered.
|
void |
outboxMessagesDeadLetterQueue(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
States that the messages overran the accepted number of delivery attempts.
|
void |
outboxMessagesUnmarkAttemptedDelivery(java.lang.String matsSocketSessionId)
When this method is invoked,
getMessagesFromOutbox(String, int) will again return messages that has
previously been marked as attempted delivered with outboxMessagesAttemptedDelivery(String, Collection) . |
long |
registerSessionAtThisNode(java.lang.String matsSocketSessionId,
java.lang.String userId,
java.lang.String connectionId,
java.lang.String clientLibAndVersions,
java.lang.String appName,
java.lang.String appVersion)
Registers a Session home to this node - only one node can ever be home, so any old is deleted.
|
int |
scavengeSessionRemnants()
Shall be invoked on some kind of schedule (e.g.
|
void |
storeMessageIdInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId)
Stores the incoming message Id, to avoid double delivery.
|
java.util.Optional<ClusterStoreAndForward.CurrentNode> |
storeMessageInOutbox(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId,
java.lang.String clientMessageId,
java.lang.String traceId,
MatsSocketServer.MessageType type,
java.lang.Long requestTimestamp,
java.lang.String envelope,
java.lang.String messageJson,
byte[] messageBinary)
Stores the message for the Session, returning the nodename for the node holding the session, if any.
|
void |
storeRequestCorrelation(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId,
long requestTimestamp,
java.lang.String replyTerminatorId,
java.lang.String correlationString,
byte[] correlationBinary) |
java.util.Collection<java.lang.String> |
timeoutSessions(long notLiveSinceTimestamp)
Shall be invoked on some kind of schedule (e.g.
|
void |
updateMessageInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId,
java.lang.String messageJson,
byte[] messageBinary)
Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based on
storeMessageIdInInbox(String, String) throwing ClusterStoreAndForward.MessageIdAlreadyExistsException ), the result from
the previous processing can be returned right away. |
void boot()
ClusterStoreAndForward
, perform DB preparations and migrations.long registerSessionAtThisNode(java.lang.String matsSocketSessionId, java.lang.String userId, java.lang.String connectionId, java.lang.String clientLibAndVersions, java.lang.String appName, java.lang.String appVersion) throws ClusterStoreAndForward.WrongUserException, ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward.WrongUserException
is thrown if this does not match.matsSocketSessionId
- the SessionId for this connection.userId
- the UserId, as provided by AuthenticationPlugin
, that own this MatsSocketSessionIdconnectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes,
a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed
and then invoking deregisterSessionFromThisNode(String, String)
clientLibAndVersions
- the Client Library and Versions + runtime information for the Client.appName
- the AppName of the accessing Client appappVersion
- the AppVersion of the accessing Client appClusterStoreAndForward.WrongUserException
- if the userId provided does not match the original userId that created the session.ClusterStoreAndForward.DataAccessException
- if problems with underlying data store.java.util.Optional<ClusterStoreAndForward.CurrentNode> getCurrentRegisteredNodeForSession(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
matsSocketSessionId
- the MatsSocketSessionId for which to find current session home.ClusterStoreAndForward.DataAccessException
boolean isSessionExists(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
matsSocketSessionId
- the MatsSocketSessionId for which to check whether there is a sessionClusterStoreAndForward.DataAccessException
java.util.List<MatsSocketServer.MatsSocketSessionDto> getSessions(boolean onlyActive, java.lang.String userId, java.lang.String appName, java.lang.String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException
MatsSocketServer.getMatsSocketSessions(boolean, String, String, String)
- go
read there for semantics.data store
.ClusterStoreAndForward.DataAccessException
int getSessionsCount(boolean onlyActive, java.lang.String userId, java.lang.String appName, java.lang.String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException
MatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)
-
go read there for semantics.data store
.ClusterStoreAndForward.DataAccessException
void deregisterSessionFromThisNode(java.lang.String matsSocketSessionId, java.lang.String connectionId) throws ClusterStoreAndForward.DataAccessException
matsSocketSessionId
- the MatsSocketSessionId for which to deregister the specific WebSocket Session's ConnectionId.connectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes,
a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed
and then invoking deregisterSessionFromThisNode(String, String)
ClusterStoreAndForward.DataAccessException
void closeSession(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
matsSocketSessionId
- the MatsSocketSessionId that should be closed.ClusterStoreAndForward.DataAccessException
void notifySessionLiveliness(java.util.Collection<java.lang.String> matsSocketSessionIds) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
about which Sessions are currently live on that node. Sessions that aren't live,
will be scavenged after some time by invocation of timeoutSessions(long)
.matsSocketSessionIds
- which sessions are currently live on the invoking node.ClusterStoreAndForward.DataAccessException
java.util.Collection<java.lang.String> timeoutSessions(long notLiveSinceTimestamp) throws ClusterStoreAndForward.DataAccessException
notified about liveliness
since the supplied timestamp
(millis from epoch).notLiveSinceTimestamp
- decides which sessions are too old: Sessions which have not been
notified about liveliness
since this timestamp will be
deleted.ClusterStoreAndForward.DataAccessException
int scavengeSessionRemnants() throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward.DataAccessException
void storeMessageIdInInbox(java.lang.String matsSocketSessionId, java.lang.String clientMessageId) throws ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward.MessageIdAlreadyExistsException
will be raised - this implies that a double delivery has occurred.matsSocketSessionId
- the MatsSocketSessionId for which to store the incoming message id.clientMessageId
- the client's message Id for the incoming message.ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Client MessageId (cmid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
void updateMessageInInbox(java.lang.String matsSocketSessionId, java.lang.String clientMessageId, java.lang.String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException
storeMessageIdInInbox(String, String)
throwing ClusterStoreAndForward.MessageIdAlreadyExistsException
), the result from
the previous processing can be returned right away.ClusterStoreAndForward.StoredInMessage getMessageFromInbox(java.lang.String matsSocketSessionId, java.lang.String clientMessageId) throws ClusterStoreAndForward.DataAccessException
void deleteMessageIdsFromInbox(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> clientMessageIds) throws ClusterStoreAndForward.DataAccessException
matsSocketSessionId
- the MatsSocketSessionId for which to delete the incoming message id.clientMessageIds
- the client's message Ids for the incoming messages to delete.ClusterStoreAndForward.DataAccessException
java.util.Optional<ClusterStoreAndForward.CurrentNode> storeMessageInOutbox(java.lang.String matsSocketSessionId, java.lang.String serverMessageId, java.lang.String clientMessageId, java.lang.String traceId, MatsSocketServer.MessageType type, java.lang.Long requestTimestamp, java.lang.String envelope, java.lang.String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException
ClusterStoreAndForward.MessageIdAlreadyExistsException
if unique constraint fails. In this case, a new Server MessageId
should be picked and try again.matsSocketSessionId
- the matsSocketSessionId that the message is meant for.serverMessageId
- unique id for outbox'ed message within the MatsSocketSessionId. Must be long enough that it is
extremely unlikely that is collides with another message within the same
MatsSocketSessionId - but do remember that there will at any one time be pretty few messages in the
outbox for a given MatsSocketSession.clientMessageId
- For Client Replies to requests from Server-to-Client, this is the Client's Message Id, which we
need to send back with the Reply so that the Client can correlate the Request. (Nullable, since not
all messages are replies).traceId
- the server-side traceId for this message.type
- the type of the outgoing message (RESOLVE, REJECT or PUB).requestTimestamp
- For Client Replies to requests from Server-to-Client, this is the timestamp we received the
Request.envelope
- the JSON-serialized MatsSocket Envelope without the 'msg' field set.messageJson
- the JSON-serialized message - the piece that sits in the 'msg' field of the Envelope. (Nullable)messageBinary
- the binary part of an outgoing message. (Nullable)ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Server MessageId (smid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
java.util.List<ClusterStoreAndForward.StoredOutMessage> getMessagesFromOutbox(java.lang.String matsSocketSessionId, int maxNumberOfMessages) throws ClusterStoreAndForward.DataAccessException
outboxMessagesAttemptedDelivery(String, Collection)
). If
outboxMessagesUnmarkAttemptedDelivery(String)
is invoked, that mark will be unset.matsSocketSessionId
- the matsSocketSessionId that the message is meant for.maxNumberOfMessages
- the maximum number of messages to fetch.ClusterStoreAndForward.DataAccessException
void outboxMessagesAttemptedDelivery(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. If outboxMessagesUnmarkAttemptedDelivery(String)
is invoked (typically on reconnect), the mark
will be unset, but the delivery count will stay in place - this is to be able to abort delivery attempts if there
is something wrong with the message.matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages failed delivery.ClusterStoreAndForward.DataAccessException
void outboxMessagesUnmarkAttemptedDelivery(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
getMessagesFromOutbox(String, int)
will again return messages that has
previously been marked as attempted delivered with outboxMessagesAttemptedDelivery(String, Collection)
.
Notice that the ClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
will not be reset, but the
ClusterStoreAndForward.StoredOutMessage.getAttemptTimestamp()
will now again return null.matsSocketSessionId
- the matsSocketSessionId whose messages now shall be attempted.ClusterStoreAndForward.DataAccessException
void outboxMessagesComplete(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages are complete.ClusterStoreAndForward.DataAccessException
void outboxMessagesDeadLetterQueue(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages should be DLQed.ClusterStoreAndForward.DataAccessException
void storeRequestCorrelation(java.lang.String matsSocketSessionId, java.lang.String serverMessageId, long requestTimestamp, java.lang.String replyTerminatorId, java.lang.String correlationString, byte[] correlationBinary) throws ClusterStoreAndForward.DataAccessException
java.util.Optional<ClusterStoreAndForward.RequestCorrelation> getAndDeleteRequestCorrelation(java.lang.String matsSocketSessionId, java.lang.String serverMessageId) throws ClusterStoreAndForward.DataAccessException