public class ClusterStoreAndForward_SQL extends java.lang.Object implements ClusterStoreAndForward
ClusterStoreAndForward_SQL_DbMigrations.
NOTE: There is heavy reliance on the Mats' MatsFactory.ContextLocal feature whereby the current Mats
StageProcessor-contextual SQL Connection is available using the MatsFactory.ContextLocal.getAttribute(Class, String...)
method. This since several of the methods on this interface will be invoked within a Mats initiation or Stage process
lambda, and thus participating in the transactional demarcation established there is vital to achieve the guaranteed
delivery and exactly-once delivery semantics.ClusterStoreAndForward.CurrentNode, ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.RequestCorrelation, ClusterStoreAndForward.SimpleCurrentNode, ClusterStoreAndForward.SimpleRequestCorrelation, ClusterStoreAndForward.SimpleStoredInMessage, ClusterStoreAndForward.SimpleStoredOutMessage, ClusterStoreAndForward.StoredInMessage, ClusterStoreAndForward.StoredOutMessage, ClusterStoreAndForward.WrongUserException| Modifier | Constructor and Description |
|---|---|
protected |
ClusterStoreAndForward_SQL(javax.sql.DataSource dataSource,
java.lang.String nodename,
java.time.Clock clock) |
| Modifier and Type | Method and Description |
|---|---|
void |
boot()
Start the
ClusterStoreAndForward, perform DB preparations and migrations. |
void |
closeSession(java.lang.String matsSocketSessionId)
Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION.
|
static ClusterStoreAndForward_SQL |
create(javax.sql.DataSource dataSource,
java.lang.String nodename) |
void |
deleteMessageIdsFromInbox(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> clientMessageIds)
Deletes the incoming message Ids, as we've established that the client will never try to send this particular
message again.
|
void |
deregisterSessionFromThisNode(java.lang.String matsSocketSessionId,
java.lang.String connectionId)
Deregisters a Session home when a WebSocket is closed.
|
java.util.Optional<ClusterStoreAndForward.RequestCorrelation> |
getAndDeleteRequestCorrelation(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId) |
java.util.Optional<ClusterStoreAndForward.CurrentNode> |
getCurrentRegisteredNodeForSession(java.lang.String matsSocketSessionId) |
ClusterStoreAndForward.StoredInMessage |
getMessageFromInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId) |
java.util.List<ClusterStoreAndForward.StoredOutMessage> |
getMessagesFromOutbox(java.lang.String matsSocketSessionId,
int maxNumberOfMessages)
Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted
delivered already (marked with
ClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)). |
java.util.List<MatsSocketServer.MatsSocketSessionDto> |
getSessions(boolean onlyActive,
java.lang.String userId,
java.lang.String appName,
java.lang.String appVersionAtOrAbove)
Direct implementation of
MatsSocketServer.getMatsSocketSessions(boolean, String, String, String) - go
read there for semantics. |
int |
getSessionsCount(boolean onlyActive,
java.lang.String userId,
java.lang.String appName,
java.lang.String appVersionAtOrAbove)
Direct implementation of
MatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String) -
go read there for semantics. |
boolean |
isSessionExists(java.lang.String matsSocketSessionId) |
void |
notifySessionLiveliness(java.util.Collection<java.lang.String> matsSocketSessionIds)
Shall be invoked on some kind of schedule (e.g.
|
void |
outboxMessagesAttemptedDelivery(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
Marks the specified messages as attempted delivered and notches the
ClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. |
void |
outboxMessagesComplete(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
States that the messages are delivered.
|
void |
outboxMessagesDeadLetterQueue(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
States that the messages overran the accepted number of delivery attempts.
|
void |
outboxMessagesUnmarkAttemptedDelivery(java.lang.String matsSocketSessionId)
When this method is invoked,
ClusterStoreAndForward.getMessagesFromOutbox(String, int) will again return messages that has
previously been marked as attempted delivered with ClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection). |
long |
registerSessionAtThisNode(java.lang.String matsSocketSessionId,
java.lang.String userId,
java.lang.String connectionId,
java.lang.String clientLibAndVersions,
java.lang.String appName,
java.lang.String appVersion)
Registers a Session home to this node - only one node can ever be home, so any old is deleted.
|
int |
scavengeSessionRemnants()
Shall be invoked on some kind of schedule (e.g.
|
void |
storeMessageIdInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId)
Stores the incoming message Id, to avoid double delivery.
|
java.util.Optional<ClusterStoreAndForward.CurrentNode> |
storeMessageInOutbox(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId,
java.lang.String clientMessageId,
java.lang.String traceId,
MatsSocketServer.MessageType type,
java.lang.Long requestTimestamp,
java.lang.String envelope,
java.lang.String messageJson,
byte[] messageBinary)
Stores the message for the Session, returning the nodename for the node holding the session, if any.
|
void |
storeRequestCorrelation(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId,
long requestTimestamp,
java.lang.String replyTerminatorId,
java.lang.String correlationString,
byte[] correlationBinary) |
java.util.Collection<java.lang.String> |
timeoutSessions(long notLiveSinceTimestamp)
Shall be invoked on some kind of schedule (e.g.
|
void |
updateMessageInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId,
java.lang.String envelopeWithMessage,
byte[] messageBinary)
Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based on
ClusterStoreAndForward.storeMessageIdInInbox(String, String) throwing ClusterStoreAndForward.MessageIdAlreadyExistsException), the result from
the previous processing can be returned right away. |
protected ClusterStoreAndForward_SQL(javax.sql.DataSource dataSource,
java.lang.String nodename,
java.time.Clock clock)
public static ClusterStoreAndForward_SQL create(javax.sql.DataSource dataSource, java.lang.String nodename)
public void boot()
ClusterStoreAndForwardClusterStoreAndForward, perform DB preparations and migrations.boot in interface ClusterStoreAndForwardpublic long registerSessionAtThisNode(java.lang.String matsSocketSessionId,
java.lang.String userId,
java.lang.String connectionId,
java.lang.String clientLibAndVersions,
java.lang.String appName,
java.lang.String appVersion)
throws ClusterStoreAndForward.WrongUserException,
ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardClusterStoreAndForward.WrongUserException is thrown if this does not match.registerSessionAtThisNode in interface ClusterStoreAndForwardmatsSocketSessionId - the SessionId for this connection.userId - the UserId, as provided by AuthenticationPlugin, that own this MatsSocketSessionIdconnectionId - an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes,
a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed
and then invoking ClusterStoreAndForward.deregisterSessionFromThisNode(String, String)clientLibAndVersions - the Client Library and Versions + runtime information for the Client.appName - the AppName of the accessing Client appappVersion - the AppVersion of the accessing Client appClusterStoreAndForward.WrongUserException - if the userId provided does not match the original userId that created the session.ClusterStoreAndForward.DataAccessException - if problems with underlying data store.public void deregisterSessionFromThisNode(java.lang.String matsSocketSessionId,
java.lang.String connectionId)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardderegisterSessionFromThisNode in interface ClusterStoreAndForwardmatsSocketSessionId - the MatsSocketSessionId for which to deregister the specific WebSocket Session's ConnectionId.connectionId - an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes,
a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed
and then invoking ClusterStoreAndForward.deregisterSessionFromThisNode(String, String)ClusterStoreAndForward.DataAccessExceptionpublic java.util.Optional<ClusterStoreAndForward.CurrentNode> getCurrentRegisteredNodeForSession(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
getCurrentRegisteredNodeForSession in interface ClusterStoreAndForwardmatsSocketSessionId - the MatsSocketSessionId for which to find current session home.ClusterStoreAndForward.DataAccessExceptionpublic boolean isSessionExists(java.lang.String matsSocketSessionId)
throws ClusterStoreAndForward.DataAccessException
isSessionExists in interface ClusterStoreAndForwardmatsSocketSessionId - the MatsSocketSessionId for which to check whether there is a sessionClusterStoreAndForward.DataAccessExceptionpublic java.util.List<MatsSocketServer.MatsSocketSessionDto> getSessions(boolean onlyActive, java.lang.String userId, java.lang.String appName, java.lang.String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardMatsSocketServer.getMatsSocketSessions(boolean, String, String, String) - go
read there for semantics.getSessions in interface ClusterStoreAndForwarddata store.ClusterStoreAndForward.DataAccessExceptionpublic int getSessionsCount(boolean onlyActive,
java.lang.String userId,
java.lang.String appName,
java.lang.String appVersionAtOrAbove)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardMatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String) -
go read there for semantics.getSessionsCount in interface ClusterStoreAndForwarddata store.ClusterStoreAndForward.DataAccessExceptionpublic void closeSession(java.lang.String matsSocketSessionId)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardcloseSession in interface ClusterStoreAndForwardmatsSocketSessionId - the MatsSocketSessionId that should be closed.ClusterStoreAndForward.DataAccessExceptionpublic void notifySessionLiveliness(java.util.Collection<java.lang.String> matsSocketSessionIds)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardClusterStoreAndForward about which Sessions are currently live on that node. Sessions that aren't live,
will be scavenged after some time by invocation of ClusterStoreAndForward.timeoutSessions(long).notifySessionLiveliness in interface ClusterStoreAndForwardmatsSocketSessionIds - which sessions are currently live on the invoking node.ClusterStoreAndForward.DataAccessExceptionpublic java.util.Collection<java.lang.String> timeoutSessions(long notLiveSinceTimestamp)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardnotified about liveliness since the supplied timestamp
(millis from epoch).timeoutSessions in interface ClusterStoreAndForwardnotLiveSinceTimestamp - decides which sessions are too old: Sessions which have not been
notified about liveliness since this timestamp will be
deleted.ClusterStoreAndForward.DataAccessExceptionpublic int scavengeSessionRemnants()
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardscavengeSessionRemnants in interface ClusterStoreAndForwardClusterStoreAndForward.DataAccessExceptionpublic void storeMessageIdInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId)
throws ClusterStoreAndForward.MessageIdAlreadyExistsException,
ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardClusterStoreAndForward.MessageIdAlreadyExistsException will be raised - this implies that a double delivery has occurred.storeMessageIdInInbox in interface ClusterStoreAndForwardmatsSocketSessionId - the MatsSocketSessionId for which to store the incoming message id.clientMessageId - the client's message Id for the incoming message.ClusterStoreAndForward.MessageIdAlreadyExistsException - if the Client MessageId (cmid) already existed for this SessionId.ClusterStoreAndForward.DataAccessExceptionpublic void updateMessageInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId,
java.lang.String envelopeWithMessage,
byte[] messageBinary)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardClusterStoreAndForward.storeMessageIdInInbox(String, String) throwing ClusterStoreAndForward.MessageIdAlreadyExistsException), the result from
the previous processing can be returned right away.updateMessageInInbox in interface ClusterStoreAndForwardClusterStoreAndForward.DataAccessExceptionpublic ClusterStoreAndForward.StoredInMessage getMessageFromInbox(java.lang.String matsSocketSessionId, java.lang.String clientMessageId) throws ClusterStoreAndForward.DataAccessException
getMessageFromInbox in interface ClusterStoreAndForwardClusterStoreAndForward.DataAccessExceptionpublic void deleteMessageIdsFromInbox(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> clientMessageIds)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwarddeleteMessageIdsFromInbox in interface ClusterStoreAndForwardmatsSocketSessionId - the MatsSocketSessionId for which to delete the incoming message id.clientMessageIds - the client's message Ids for the incoming messages to delete.ClusterStoreAndForward.DataAccessExceptionpublic java.util.Optional<ClusterStoreAndForward.CurrentNode> storeMessageInOutbox(java.lang.String matsSocketSessionId, java.lang.String serverMessageId, java.lang.String clientMessageId, java.lang.String traceId, MatsSocketServer.MessageType type, java.lang.Long requestTimestamp, java.lang.String envelope, java.lang.String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException
ClusterStoreAndForwardClusterStoreAndForward.MessageIdAlreadyExistsException if unique constraint fails. In this case, a new Server MessageId
should be picked and try again.storeMessageInOutbox in interface ClusterStoreAndForwardmatsSocketSessionId - the matsSocketSessionId that the message is meant for.serverMessageId - unique id for outbox'ed message within the MatsSocketSessionId. Must be long enough that it is
extremely unlikely that is collides with another message within the same
MatsSocketSessionId - but do remember that there will at any one time be pretty few messages in the
outbox for a given MatsSocketSession.clientMessageId - For Client Replies to requests from Server-to-Client, this is the Client's Message Id, which we
need to send back with the Reply so that the Client can correlate the Request. (Nullable, since not
all messages are replies).traceId - the server-side traceId for this message.type - the type of the outgoing message (RESOLVE, REJECT or PUB).requestTimestamp - For Client Replies to requests from Server-to-Client, this is the timestamp we received the
Request.envelope - the JSON-serialized MatsSocket Envelope without the 'msg' field set.messageJson - the JSON-serialized message - the piece that sits in the 'msg' field of the Envelope. (Nullable)messageBinary - the binary part of an outgoing message. (Nullable)ClusterStoreAndForward.MessageIdAlreadyExistsException - if the Server MessageId (smid) already existed for this SessionId.ClusterStoreAndForward.DataAccessExceptionpublic java.util.List<ClusterStoreAndForward.StoredOutMessage> getMessagesFromOutbox(java.lang.String matsSocketSessionId, int maxNumberOfMessages) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)). If
ClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String) is invoked, that mark will be unset.getMessagesFromOutbox in interface ClusterStoreAndForwardmatsSocketSessionId - the matsSocketSessionId that the message is meant for.maxNumberOfMessages - the maximum number of messages to fetch.ClusterStoreAndForward.DataAccessExceptionpublic void outboxMessagesAttemptedDelivery(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. If ClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String) is invoked (typically on reconnect), the mark
will be unset, but the delivery count will stay in place - this is to be able to abort delivery attempts if there
is something wrong with the message.outboxMessagesAttemptedDelivery in interface ClusterStoreAndForwardmatsSocketSessionId - the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds - which messages failed delivery.ClusterStoreAndForward.DataAccessExceptionpublic void outboxMessagesUnmarkAttemptedDelivery(java.lang.String matsSocketSessionId)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardClusterStoreAndForward.getMessagesFromOutbox(String, int) will again return messages that has
previously been marked as attempted delivered with ClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection).
Notice that the ClusterStoreAndForward.StoredOutMessage.getDeliveryCount() will not be reset, but the
ClusterStoreAndForward.StoredOutMessage.getAttemptTimestamp() will now again return null.outboxMessagesUnmarkAttemptedDelivery in interface ClusterStoreAndForwardmatsSocketSessionId - the matsSocketSessionId whose messages now shall be attempted.ClusterStoreAndForward.DataAccessExceptionpublic void outboxMessagesComplete(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardoutboxMessagesComplete in interface ClusterStoreAndForwardmatsSocketSessionId - the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds - which messages are complete.ClusterStoreAndForward.DataAccessExceptionpublic void outboxMessagesDeadLetterQueue(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForwardoutboxMessagesDeadLetterQueue in interface ClusterStoreAndForwardmatsSocketSessionId - the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds - which messages should be DLQed.ClusterStoreAndForward.DataAccessExceptionpublic void storeRequestCorrelation(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId,
long requestTimestamp,
java.lang.String replyTerminatorId,
java.lang.String correlationString,
byte[] correlationBinary)
throws ClusterStoreAndForward.DataAccessException
storeRequestCorrelation in interface ClusterStoreAndForwardClusterStoreAndForward.DataAccessExceptionpublic java.util.Optional<ClusterStoreAndForward.RequestCorrelation> getAndDeleteRequestCorrelation(java.lang.String matsSocketSessionId, java.lang.String serverMessageId) throws ClusterStoreAndForward.DataAccessException
getAndDeleteRequestCorrelation in interface ClusterStoreAndForwardClusterStoreAndForward.DataAccessException