1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.ksb.messaging.dao.impl;
18
19 import org.kuali.rice.core.api.config.CoreConfigHelper;
20 import org.kuali.rice.core.api.exception.RiceRuntimeException;
21 import org.kuali.rice.core.api.util.RiceUtilities;
22 import org.kuali.rice.core.framework.persistence.jpa.criteria.Criteria;
23 import org.kuali.rice.core.framework.persistence.jpa.criteria.QueryByCriteria;
24 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
25 import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
26 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
27 import org.kuali.rice.ksb.util.KSBConstants;
28
29 import javax.persistence.EntityManager;
30 import javax.persistence.PersistenceContext;
31 import javax.persistence.Query;
32 import javax.xml.namespace.QName;
33 import java.util.List;
34 import java.util.Map;
35
36
37 public class MessageQueueDAOJpaImpl implements MessageQueueDAO {
38
39 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDAOJpaImpl.class);
40
41 @PersistenceContext
42 private EntityManager entityManager;
43
44 @SuppressWarnings("unchecked")
45 public List<PersistedMessageBO> findAll() {
46 if (LOG.isDebugEnabled()) {
47 LOG.debug("Returning all persisted messages");
48 }
49
50 Query query = entityManager.createNamedQuery("PersistedMessageBO.FindAll");
51 return (List<PersistedMessageBO>) query.getResultList();
52 }
53
54 @SuppressWarnings("unchecked")
55 public List<PersistedMessageBO> findAll(int maxRows) {
56 if (LOG.isDebugEnabled()) {
57 LOG.debug("Finding next " + maxRows + " messages");
58 }
59
60 Query query = entityManager.createNamedQuery("PersistedMessageBO.FindAll");
61 query.setMaxResults(maxRows);
62
63 return (List<PersistedMessageBO>) query.getResultList();
64 }
65
66
67 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
68 return (PersistedMessagePayload) entityManager.find(PersistedMessagePayload.class, routeQueueId);
69 }
70
71
72 public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
73 return (PersistedMessageBO) entityManager.find(PersistedMessageBO.class, routeQueueId);
74 }
75
76 @SuppressWarnings("unchecked")
77 public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
78 if (LOG.isDebugEnabled()) {
79 LOG.debug("Finding messages for service name " + serviceName);
80 }
81
82 Query query = entityManager.createNamedQuery("PersistedMessageBO.FindByServiceName");
83 query.setParameter("serviceName", serviceName.toString());
84 query.setParameter("methodName", methodName);
85
86 return (List<PersistedMessageBO>) query.getResultList();
87 }
88
89 @SuppressWarnings("unchecked")
90 public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
91 Criteria criteria = new Criteria(PersistedMessageBO.class.getName());
92 for(Map.Entry<String, String> entry : criteriaValues.entrySet()) {
93 criteria.eq(entry.getKey(), entry.getValue());
94 }
95
96 QueryByCriteria query = new QueryByCriteria(entityManager, criteria);
97
98 return query.toQuery().getResultList();
99 }
100
101 @SuppressWarnings("unchecked")
102 public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
103 String applicationId = CoreConfigHelper.getApplicationId();
104
105 Query query = entityManager.createNamedQuery("PersistedMessageBO.GetNextDocuments");
106 query.setParameter("applicationId", applicationId);
107 query.setParameter("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
108 query.setParameter("ipNumber", RiceUtilities.getIpNumber());
109
110 if (maxDocuments != null)
111 query.setMaxResults(maxDocuments);
112
113 return (List<PersistedMessageBO>) query.getResultList();
114 }
115
116
117 public void remove(PersistedMessageBO routeQueue) {
118 if (LOG.isDebugEnabled()) {
119 LOG.debug("Removing message " + routeQueue);
120 }
121 if (routeQueue.getRouteQueueId() == null) {
122 throw new RiceRuntimeException("can't delete a PersistedMessageBO with no id");
123 }
124
125 routeQueue = entityManager.merge(routeQueue);
126 entityManager.remove(routeQueue);
127
128 if (routeQueue.getPayload() != null) {
129 PersistedMessagePayload payload = entityManager.merge(routeQueue.getPayload());
130 entityManager.remove(payload);
131 }
132 }
133
134
135 public void save(PersistedMessageBO routeQueue) {
136 if (LOG.isDebugEnabled()) {
137 LOG.debug("Persisting message " + routeQueue);
138 }
139 PersistedMessageBO jpaInstance = entityManager.merge(routeQueue);
140 Long routeQueueId = jpaInstance.getRouteQueueId();
141 Integer verNo = jpaInstance.getLockVerNbr();
142 routeQueue.setRouteQueueId(routeQueueId);
143 routeQueue.setLockVerNbr(verNo);
144
145 if (routeQueue.getPayload() != null) {
146 routeQueue.getPayload().setRouteQueueId(routeQueueId);
147 entityManager.merge(routeQueue.getPayload());
148 }
149 }
150
151 public EntityManager getEntityManager() {
152 return this.entityManager;
153 }
154
155 public void setEntityManager(EntityManager entityManager) {
156 this.entityManager = entityManager;
157 }
158
159 }