View Javadoc
1   /**
2    * Copyright 2005-2016 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.rice.ksb.messaging.dao.impl;
17  
18  import org.kuali.rice.core.api.config.CoreConfigHelper;
19  import org.kuali.rice.core.api.exception.RiceRuntimeException;
20  import org.kuali.rice.core.api.util.RiceUtilities;
21  import org.kuali.rice.ksb.messaging.PersistedMessageBO;
22  import org.kuali.rice.ksb.messaging.PersistedMessagePayload;
23  import org.kuali.rice.ksb.messaging.dao.MessageQueueDAO;
24  import org.kuali.rice.ksb.util.KSBConstants;
25  
26  import javax.persistence.EntityManager;
27  import javax.persistence.Query;
28  import javax.persistence.TypedQuery;
29  import javax.persistence.criteria.CriteriaBuilder;
30  import javax.persistence.criteria.CriteriaQuery;
31  import javax.persistence.criteria.Predicate;
32  import javax.persistence.criteria.Root;
33  import javax.xml.namespace.QName;
34  import java.util.List;
35  import java.util.Map;
36  
37  public class MessageQueueDaoJpa implements MessageQueueDAO {
38  
39      private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(MessageQueueDaoJpa.class);
40  
41      private EntityManager entityManager;
42  
43      public List<PersistedMessageBO> findAll() {
44          if (LOG.isDebugEnabled()) {
45              LOG.debug("Returning all persisted messages");
46          }
47  
48          Query query = entityManager.createNamedQuery("PersistedMessageBO.FindAll");
49          return query.getResultList();
50      }
51  
52      @SuppressWarnings("unchecked")
53      public List<PersistedMessageBO> findAll(int maxRows) {
54          if (LOG.isDebugEnabled()) {
55              LOG.debug("Finding next " + maxRows + " messages");
56          }
57  
58          TypedQuery<PersistedMessageBO> query = entityManager.createNamedQuery("PersistedMessageBO.FindAll",
59                  PersistedMessageBO.class);
60          query.setMaxResults(maxRows);
61  
62          return query.getResultList();
63      }
64  
65      public PersistedMessagePayload findByPersistedMessageByRouteQueueId(Long routeQueueId) {
66          return entityManager.find(PersistedMessagePayload.class, routeQueueId);
67      }
68  
69      public PersistedMessageBO findByRouteQueueId(Long routeQueueId) {
70          return entityManager.find(PersistedMessageBO.class, routeQueueId);
71      }
72  
73      public List<PersistedMessageBO> findByServiceName(QName serviceName, String methodName) {
74          if (LOG.isDebugEnabled()) {
75              LOG.debug("Finding messages for service name " + serviceName);
76          }
77  
78          TypedQuery<PersistedMessageBO> query = entityManager.createNamedQuery("PersistedMessageBO.FindByServiceName",
79                  PersistedMessageBO.class);
80          query.setParameter("serviceName", serviceName.toString());
81          query.setParameter("methodName", methodName);
82  
83          return query.getResultList();
84      }
85  
86      public List<PersistedMessageBO> findByValues(Map<String, String> criteriaValues, int maxRows) {
87          CriteriaBuilder builder = entityManager.getCriteriaBuilder();
88          CriteriaQuery<PersistedMessageBO> query = builder.createQuery(PersistedMessageBO.class);
89          Root<PersistedMessageBO> message = query.from(PersistedMessageBO.class);
90          Predicate predicate = builder.conjunction();
91          for (Map.Entry<String, String> entry : criteriaValues.entrySet()) {
92              predicate = builder.and(predicate, builder.equal(message.get(entry.getKey()), entry.getValue()));
93          }
94          query.where(predicate);
95          TypedQuery<PersistedMessageBO> typedQuery = entityManager.createQuery(query);
96          return typedQuery.getResultList();
97      }
98  
99      public List<PersistedMessageBO> getNextDocuments(Integer maxDocuments) {
100         String applicationId = CoreConfigHelper.getApplicationId();
101 
102         TypedQuery<PersistedMessageBO> query = entityManager.createNamedQuery("PersistedMessageBO.GetNextDocuments",
103                 PersistedMessageBO.class);
104         query.setParameter("applicationId", applicationId);
105         query.setParameter("queueStatus", KSBConstants.ROUTE_QUEUE_EXCEPTION);
106         query.setParameter("ipNumber", RiceUtilities.getIpNumber());
107 
108         if (maxDocuments != null) {
109             query.setMaxResults(maxDocuments);
110         }
111 
112         return query.getResultList();
113     }
114 
115     public void remove(PersistedMessageBO routeQueue) {
116         if (LOG.isDebugEnabled()) {
117             LOG.debug("Removing message " + routeQueue);
118         }
119         if (routeQueue.getRouteQueueId() == null) {
120             throw new RiceRuntimeException("can't delete a PersistedMessageBO with no id");
121         }
122 
123         routeQueue = entityManager.merge(routeQueue);
124         entityManager.remove(routeQueue);
125 
126         if (routeQueue.getPayload() != null) {
127             PersistedMessagePayload payload = entityManager.merge(routeQueue.getPayload());
128             entityManager.remove(payload);
129         }
130     }
131 
132     public PersistedMessageBO save(PersistedMessageBO routeQueue) {
133         if (LOG.isDebugEnabled()) {
134             LOG.debug("Persisting message " + routeQueue);
135         }
136         routeQueue = entityManager.merge(routeQueue);
137         entityManager.flush();
138         PersistedMessagePayload payload = routeQueue.getPayload();
139         if (payload != null) {
140             payload.setRouteQueueId(routeQueue.getRouteQueueId());
141             payload = entityManager.merge(payload);
142             entityManager.flush();
143             routeQueue.setPayload(payload);
144         }
145         return routeQueue;
146     }
147 
148     public EntityManager getEntityManager() {
149         return this.entityManager;
150     }
151 
152     public void setEntityManager(EntityManager entityManager) {
153         this.entityManager = entityManager;
154     }
155 
156 }