1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.dao.impl;
17
18 import org.kuali.rice.core.api.config.CoreConfigHelper;
19 import org.kuali.rice.core.api.exception.RiceRuntimeException;
20 import org.kuali.rice.core.api.util.RiceUtilities;
21 import org.kuali.rice.ksb.messaging.PersistedMessageBO;
22 import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
23 import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
24 import org.kuali.rice.ksb.util.KSBConstants;
25
26 import javax.persistence.EntityManager;
27 import javax.persistence.Query;
28 import javax.persistence.TypedQuery;
29 import javax.persistence.criteria.CriteriaBuilder;
30 import javax.persistence.criteria.CriteriaQuery;
31 import javax.persistence.criteria.Predicate;
32 import javax.persistence.criteria.Root;
33 import javax.xml.namespace.QName;
34 import java.util.List;
35 import java.util.Map;
36
37 public class MessageQueueDaoJpa implements MessageQueueDAO {
38
39 private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDaoJpa.class);
40
41 private EntityManager entityManager;
42
43 public List<PersistedMessageBO> findAll() {
44 if (LOG.isDebugEnabled()) {
45 LOG.debug("Returning all persisted messages");
46 }
47
48 Query query = entityManager.createNamedQuery("PersistedMessageBO.FindAll");
49 return query.getResultList();
50 }
51
52 @SuppressWarnings("unchecked")
53 public List<PersistedMessageBO> findAll(int maxRows) {
54 if (LOG.isDebugEnabled()) {
55 LOG.debug("Finding next " + maxRows + " messages");
56 }
57
58 TypedQuery<PersistedMessageBO> query = entityManager.createNamedQuery("PersistedMessageBO.FindAll",
59 PersistedMessageBO.class);
60 query.setMaxResults(maxRows);
61
62 return query.getResultList();
63 }
64
65 public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
66 return entityManager.find(PersistedMessagePayload.class, routeQueueId);
67 }
68
69 public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
70 return entityManager.find(PersistedMessageBO.class, routeQueueId);
71 }
72
73 public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
74 if (LOG.isDebugEnabled()) {
75 LOG.debug("Finding messages for service name " + serviceName);
76 }
77
78 TypedQuery<PersistedMessageBO> query = entityManager.createNamedQuery("PersistedMessageBO.FindByServiceName",
79 PersistedMessageBO.class);
80 query.setParameter("serviceName", serviceName.toString());
81 query.setParameter("methodName", methodName);
82
83 return query.getResultList();
84 }
85
86 public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
87 CriteriaBuilder builder = entityManager.getCriteriaBuilder();
88 CriteriaQuery<PersistedMessageBO> query = builder.createQuery(PersistedMessageBO.class);
89 Root<PersistedMessageBO> message = query.from(PersistedMessageBO.class);
90 Predicate predicate = builder.conjunction();
91 for (Map.Entry<String, String> entry : criteriaValues.entrySet()) {
92 predicate = builder.and(predicate, builder.equal(message.get(entry.getKey()), entry.getValue()));
93 }
94 query.where(predicate);
95 TypedQuery<PersistedMessageBO> typedQuery = entityManager.createQuery(query);
96 return typedQuery.getResultList();
97 }
98
99 public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
100 String applicationId = CoreConfigHelper.getApplicationId();
101
102 TypedQuery<PersistedMessageBO> query = entityManager.createNamedQuery("PersistedMessageBO.GetNextDocuments",
103 PersistedMessageBO.class);
104 query.setParameter("applicationId", applicationId);
105 query.setParameter("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
106 query.setParameter("ipNumber", RiceUtilities.getIpNumber());
107
108 if (maxDocuments != null) {
109 query.setMaxResults(maxDocuments);
110 }
111
112 return query.getResultList();
113 }
114
115 public void remove(PersistedMessageBO routeQueue) {
116 if (LOG.isDebugEnabled()) {
117 LOG.debug("Removing message " + routeQueue);
118 }
119 if (routeQueue.getRouteQueueId() == null) {
120 throw new RiceRuntimeException("can't delete a PersistedMessageBO with no id");
121 }
122
123 routeQueue = entityManager.merge(routeQueue);
124 entityManager.remove(routeQueue);
125
126 if (routeQueue.getPayload() != null) {
127 PersistedMessagePayload payload = entityManager.merge(routeQueue.getPayload());
128 entityManager.remove(payload);
129 }
130 }
131
132 public PersistedMessageBO save(PersistedMessageBO routeQueue) {
133 if (LOG.isDebugEnabled()) {
134 LOG.debug("Persisting message " + routeQueue);
135 }
136 routeQueue = entityManager.merge(routeQueue);
137 entityManager.flush();
138 PersistedMessagePayload payload = routeQueue.getPayload();
139 if (payload != null) {
140 payload.setRouteQueueId(routeQueue.getRouteQueueId());
141 payload = entityManager.merge(payload);
142 entityManager.flush();
143 routeQueue.setPayload(payload);
144 }
145 return routeQueue;
146 }
147
148 public EntityManager getEntityManager() {
149 return this.entityManager;
150 }
151
152 public void setEntityManager(EntityManager entityManager) {
153 this.entityManager = entityManager;
154 }
155
156 }