public boolean processInMessage()

in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java [80:259]


	public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
			log.debug("Enter: MakeConnectionProcessor::processInMessage " + rmMsgCtx.getSOAPEnvelope().getBody());

		try {

			MakeConnection makeConnection = rmMsgCtx.getMakeConnection();

			String address = makeConnection.getAddress();
			Identifier identifier = makeConnection.getIdentifier();

			// If there is no address or identifier - make the MissingSelection Fault.
			if (address == null && identifier == null)
				FaultManager.makeMissingSelectionFault(rmMsgCtx);

			if (makeConnection.getUnexpectedElement() != null)
				FaultManager.makeUnsupportedSelectionFault(rmMsgCtx, makeConnection.getUnexpectedElement());

			//some initial setup
			ConfigurationContext configurationContext = rmMsgCtx.getConfigurationContext();
			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration());
			SecurityManager secManager = SandeshaUtil.getSecurityManager(configurationContext);
			SecurityToken token = secManager.getSecurityToken(rmMsgCtx.getMessageContext());

			//we want to find valid sender beans
			List<RMSequenceBean> possibleBeans = new ArrayList<RMSequenceBean>();
			int possibleBeanIndex = -10;
			SenderBean findSenderBean = new SenderBean();
			boolean secured = false;
			if (token != null && identifier == null) {
				secured = true;
				if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("token found " + token);
				//this means we have to scope our search for sender beans that belong to sequences that own the same token
				String data = secManager.getTokenRecoveryData(token);
				//first look for RMS beans
				RMSBean finderRMS = new RMSBean();
				finderRMS.setSecurityTokenData(data);
				finderRMS.setToEPR(address);
				List<RMSBean> tempList2 = storageManager.getRMSBeanMgr().find(finderRMS);
				possibleBeans.addAll(tempList2);

				//try looking for RMD beans too
				RMDBean finderRMD = new RMDBean();
				finderRMD.setSecurityTokenData(data);
				finderRMD.setToAddress(address);
				List<RMDBean> tempList = storageManager.getRMDBeanMgr().find(finderRMD);

				//combine these two into one list
				possibleBeans.addAll(tempList);

				int size = possibleBeans.size();

				if (size > 0) {
					//select one at random: TODO better method?
					Random random = new Random();
					possibleBeanIndex = random.nextInt(size);
					RMSequenceBean selectedSequence = (RMSequenceBean) possibleBeans.get(possibleBeanIndex);
					findSenderBean.setSequenceID(selectedSequence.getSequenceID());
					if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
						log.debug("sequence selected " + findSenderBean.getSequenceID());
				} else {
					//we cannot match a RMD with the correct security credentials so we cannot process this msg under RSP
					if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
						log.debug("Exit: MakeConnectionProcessor::processInMessage : no RM sequence bean with security credentials");
					return false;
				}

				// Commit this transaction to clear up held RMS/RMDBeans
				if (transaction != null && transaction.isActive())
					transaction.commit();

				// Get a new transaction
				transaction = storageManager.getTransaction();
			}

			//lookup a sender bean
			SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();

			//selecting the set of SenderBeans that suit the given criteria.
			findSenderBean.setSend(true);
			findSenderBean.setTransportAvailable(false);

			if (address != null)
				findSenderBean.setToAddress(address);

			if (identifier != null) {
				if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
					log.debug("identifier set, this violates RSP " + identifier);
				findSenderBean.setSequenceID(identifier.getIdentifier());
			}

			SenderBean senderBean = null;
			boolean pending = false;
			while (true) {
				// Set the time to send field to be now
				findSenderBean.setTimeToSend(System.currentTimeMillis());

				//finding the beans that go with the criteria of the passed SenderBean
				//The reSend flag is ignored for this selection, so there is no need to
				//set it.
				Collection<SenderBean> collection = senderBeanMgr.find(findSenderBean);

				//removing beans that does not pass the resend test
				for (Iterator<SenderBean> it = collection.iterator(); it.hasNext();) {
					SenderBean bean = (SenderBean) it.next();
					if (!bean.isReSend() && bean.getSentCount() > 0)
						it.remove();
				}

				//selecting a bean to send RANDOMLY. TODO- Should use a better mechanism.
				int size = collection.size();
				int itemToPick = -1;

				pending = false;
				if (size > 0) {
					Random random = new Random();
					itemToPick = random.nextInt(size);
				}

				if (size > 1)
					pending = true;  //there are more than one message to be delivered using the makeConnection.
				//So the MessagePending header should have value true;

				Iterator<SenderBean> it = collection.iterator();

				senderBean = null;
				for (int item = 0; item < size; item++) {
					senderBean = (SenderBean) it.next();
					if (item == itemToPick)
						break;
				}

				if (senderBean == null) {
					//If secured try another sequence
					//Remove old one from the list and pick another random one
					if (secured) {
						possibleBeans.remove(possibleBeanIndex);
						int possBeansSize = possibleBeans.size();

						if (possBeansSize > 0) {
							//select one at random: TODO better method?
							Random random = new Random();
							possibleBeanIndex = random.nextInt(possBeansSize);
							RMSequenceBean selectedSequence = (RMSequenceBean) possibleBeans.get(possibleBeanIndex);
							findSenderBean.setSequenceID(selectedSequence.getSequenceID());
							if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
								log.debug("sequence selected " + findSenderBean.getSequenceID());
						} else {
							if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
								log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message found");
							return false;
						}
					} else {
						if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
							log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message found");
						// this property indicate synapse nhttp transport to send a 202 Accepted header
                        rmMsgCtx.getMessageContext().setProperty("FORCE_SC_ACCEPTED", Constants.VALUE_TRUE);
						return false;
					}
				} else {
					break;
				}
			}

			if (transaction != null && transaction.isActive()) {
				transaction.commit();
				transaction = storageManager.getTransaction();
			}
			replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue(), transaction);

		} finally {
			if (transaction != null && transaction.isActive()) {
				transaction.rollback();
			}
		}

		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
			log.debug("Exit: MakeConnectionProcessor::processInMessage");
		return false;
	}