1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.dao.impl;
17
18 import org.kuali.rice.core.api.config.CoreConfigHelper;
19 import org.kuali.rice.core.api.exception.RiceRuntimeException;
20 import org.kuali.rice.core.api.util.RiceUtilities;
21 import org.kuali.rice.core.framework.persistence.jpa.criteria.Criteria;
22 import org.kuali.rice.core.framework.persistence.jpa.criteria.QueryByCriteria;
23 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
24 import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
25 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
26 import org.kuali.rice.ksb.util.KSBConstants;
27
28 import javax.persistence.EntityManager;
29 import javax.persistence.PersistenceContext;
30 import javax.persistence.Query;
31 import javax.xml.namespace.QName;
32 import java.util.List;
33 import java.util.Map;
34
35
36 public class MessageQueueDAOJpaImpl implements MessageQueueDAO {
37
38 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDAOJpaImpl.class);
39
40 @PersistenceContext
41 private EntityManager entityManager;
42
43 @SuppressWarnings("unchecked")
44 public List<PersistedMessageBO> findAll() {
45 if (LOG.isDebugEnabled()) {
46 LOG.debug("Returning all persisted messages");
47 }
48
49 Query query = entityManager.createNamedQuery("PersistedMessageBO.FindAll");
50 return (List<PersistedMessageBO>) query.getResultList();
51 }
52
53 @SuppressWarnings("unchecked")
54 public List<PersistedMessageBO> findAll(int maxRows) {
55 if (LOG.isDebugEnabled()) {
56 LOG.debug("Finding next " + maxRows + " messages");
57 }
58
59 Query query = entityManager.createNamedQuery("PersistedMessageBO.FindAll");
60 query.setMaxResults(maxRows);
61
62 return (List<PersistedMessageBO>) query.getResultList();
63 }
64
65
66 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
67 return (PersistedMessagePayload) entityManager.find(PersistedMessagePayload.class, routeQueueId);
68 }
69
70
71 public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
72 return (PersistedMessageBO) entityManager.find(PersistedMessageBO.class, routeQueueId);
73 }
74
75 @SuppressWarnings("unchecked")
76 public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
77 if (LOG.isDebugEnabled()) {
78 LOG.debug("Finding messages for service name " + serviceName);
79 }
80
81 Query query = entityManager.createNamedQuery("PersistedMessageBO.FindByServiceName");
82 query.setParameter("serviceName", serviceName.toString());
83 query.setParameter("methodName", methodName);
84
85 return (List<PersistedMessageBO>) query.getResultList();
86 }
87
88 @SuppressWarnings("unchecked")
89 public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
90 Criteria criteria = new Criteria(PersistedMessageBO.class.getName());
91 for(Map.Entry<String, String> entry : criteriaValues.entrySet()) {
92 criteria.eq(entry.getKey(), entry.getValue());
93 }
94
95 QueryByCriteria query = new QueryByCriteria(entityManager, criteria);
96
97 return query.toQuery().getResultList();
98 }
99
100 @SuppressWarnings("unchecked")
101 public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
102 String applicationId = CoreConfigHelper.getApplicationId();
103
104 Query query = entityManager.createNamedQuery("PersistedMessageBO.GetNextDocuments");
105 query.setParameter("applicationId", applicationId);
106 query.setParameter("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
107 query.setParameter("ipNumber", RiceUtilities.getIpNumber());
108
109 if (maxDocuments != null)
110 query.setMaxResults(maxDocuments);
111
112 return (List<PersistedMessageBO>) query.getResultList();
113 }
114
115
116 public void remove(PersistedMessageBO routeQueue) {
117 if (LOG.isDebugEnabled()) {
118 LOG.debug("Removing message " + routeQueue);
119 }
120 if (routeQueue.getRouteQueueId() == null) {
121 throw new RiceRuntimeException("can't delete a PersistedMessageBO with no id");
122 }
123
124 routeQueue = entityManager.merge(routeQueue);
125 entityManager.remove(routeQueue);
126
127 if (routeQueue.getPayload() != null) {
128 PersistedMessagePayload payload = entityManager.merge(routeQueue.getPayload());
129 entityManager.remove(payload);
130 }
131 }
132
133
134 public void save(PersistedMessageBO routeQueue) {
135 if (LOG.isDebugEnabled()) {
136 LOG.debug("Persisting message " + routeQueue);
137 }
138 PersistedMessageBO jpaInstance = entityManager.merge(routeQueue);
139 Long routeQueueId = jpaInstance.getRouteQueueId();
140 Integer verNo = jpaInstance.getLockVerNbr();
141 routeQueue.setRouteQueueId(routeQueueId);
142 routeQueue.setLockVerNbr(verNo);
143
144 if (routeQueue.getPayload() != null) {
145 routeQueue.getPayload().setRouteQueueId(routeQueueId);
146 entityManager.merge(routeQueue.getPayload());
147 }
148 }
149
150 public EntityManager getEntityManager() {
151 return this.entityManager;
152 }
153
154 public void setEntityManager(EntityManager entityManager) {
155 this.entityManager = entityManager;
156 }
157
158 }