Revision 162:3312590ddb4f

View differences:

src/main/java/cz/agents/dimaptools/heuristic/relaxed/LazilyDistributedFFHeuristic.java
7 7
import org.apache.log4j.Logger;
8 8

  
9 9
import cz.agents.alite.communication.Communicator;
10
import cz.agents.alite.communication.Message;
11
import cz.agents.alite.communication.QueuedMessageHandler;
12
import cz.agents.alite.communication.content.Content;
10
import cz.agents.dimaptools.DIMAPWorldInterface;
13 11
import cz.agents.dimaptools.communication.message.HeuristicReplyWithPublicActionsMessage;
14 12
import cz.agents.dimaptools.communication.message.HeuristicRequestMessage;
13
import cz.agents.dimaptools.communication.protocol.DistributedHeuristicProtocol;
15 14
import cz.agents.dimaptools.experiment.DataAccumulator;
16 15
import cz.agents.dimaptools.heuristic.HeuristicResult;
17 16
import cz.agents.dimaptools.heuristic.relaxed.evaluator.FFEvaluator;
18 17
import cz.agents.dimaptools.model.Action;
19
import cz.agents.dimaptools.model.Problem;
20 18
import cz.agents.dimaptools.model.State;
21 19
import cz.agents.dimaptools.model.SuperState;
22 20

  
......
26 24

  
27 25
	private final String id;
28 26

  
27
	private final DistributedHeuristicProtocol protocol;
28
	private DistributedHeuristicProtocol otherProtocol = null;
29 29
	private final Communicator comm;
30
	private QueuedMessageHandler requestHandler = new QueuedMessageHandler(HeuristicRequestMessage.class);
31
    private QueuedMessageHandler replyHandler = new QueuedMessageHandler(HeuristicReplyWithPublicActionsMessage.class);
32

  
30
	
33 31
    private RelaxedPlan rp = new RelaxedPlan();
34 32
    RelaxationHeuristicRequest currentRequest = new RelaxationHeuristicRequest(null);
35 33

  
......
38 36

  
39 37
    private final int maxRecursionDepth;
40 38

  
41
    public LazilyDistributedFFHeuristic(Problem problem, Communicator comm,boolean receiveRequests) {
42
    	this(problem, comm, receiveRequests, Integer.MAX_VALUE);
39
    public LazilyDistributedFFHeuristic(DIMAPWorldInterface world,boolean receiveRequests) {
40
    	this(world, receiveRequests, Integer.MAX_VALUE);
43 41
    }
44 42

  
45
	public LazilyDistributedFFHeuristic(Problem problem, Communicator comm,boolean receiveRequests,int maxRecursionDepth) {
46
		super(problem,new FFEvaluator(problem),true);
43
	public LazilyDistributedFFHeuristic(DIMAPWorldInterface world,boolean receiveRequests,int maxRecursionDepth) {
44
		super(world.getProblem(),new FFEvaluator(world.getProblem()),true);
47 45

  
48
		this.comm = comm;
49
		this.maxRecursionDepth = maxRecursionDepth;
46
		this.comm = world.getCommunicator();
47
		this.maxRecursionDepth = maxRecursionDepth > -1 ? maxRecursionDepth : Integer.MAX_VALUE;
50 48

  
49
		protocol = new DistributedHeuristicProtocol(world.getCommunicator(),world.getAgentName(),world.getEncoder(),receiveRequests) {
50
			
51
			@Override
52
			public void receiveHeuristicRequestMessage(HeuristicRequestMessage req, String sender) {
53
				if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " received request from "+sender+": " + req.getRequestHash());
54

  
55
            	processRequest(req,sender);
56
			}
57
			
58
			@Override
59
			public void receiveHeuristicReplyWithPublicActionsMessage(HeuristicReplyWithPublicActionsMessage re, String sender) {
60
				if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " received reply from "+sender+": " + re.getRequestHash());
61

  
62
            	processReply(re,sender);
63
			}
64
		};
65
		
51 66
		if(receiveRequests){
52 67
			id = "receive";
53
			comm.addMessageHandler(requestHandler);
54 68
		}else{
55 69
			id = "send";
56
			comm.addMessageHandler(replyHandler);
57 70
		}
58 71

  
59 72
		LOGGER = Logger.getLogger(problem.agent + "." + RelaxationHeuristic.class);
60 73
	}
74
	
75
	public DistributedHeuristicProtocol getProtocol() {
76
		return protocol;
77
	}
78
	
79
	public void setOtherProtocol(DistributedHeuristicProtocol otherProtocol){
80
		this.otherProtocol = otherProtocol;
81
	}
61 82

  
62 83
	
63 84

  
......
190 211
		DataAccumulator.getAccumulator().heuristicRequestMessages ++;
191 212
		DataAccumulator.getAccumulator().totalBytes += req.getBytes();
192 213

  
193
		Message message = comm.createMessage(new Content(req));
194
        message.addReceiver(agent);
195
        comm.sendMessage(message);
214
		if(otherProtocol != null && agent.equals(comm.getAddress())){
215
			otherProtocol.receiveHeuristicRequestMessage(req, comm.getAddress());
216
		}else{
217
			protocol.sendHeuristicRequestMessage(req, agent);
218
		}
219
		
196 220
	}
197 221

  
198 222
	@Override
......
201 225
			LOGGER.warn("NOT INITIALIZED YET!");
202 226
		}
203 227

  
204

  
205

  
206
//		LOGGER.info(domain.agent + "("+id+")" + " requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", currentRequest.waitingFor():"+currentRequest.waitingFor());
207
		boolean log = requestHandler.queueSize() > 0 || replyHandler.queueSize() > 0;
208

  
209
		if(log)if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " process messages " +  requestHandler.queueSize() + "/" + replyHandler.queueSize() + " ...");
210

  
211

  
212

  
213
		while(requestHandler.hasMessage()){
214
            final Message message = requestHandler.pullMessage();
215

  
216

  
217
            	HeuristicRequestMessage req = (HeuristicRequestMessage)message.getContent().getData();
218

  
219
//            	if (message.getSender().equals(comm.getAddress())) {
220
//                	LOGGER.info(comm.getAddress() + "("+id+")" + " received request from self("+message.getSender()+"): " + req.getRequestHash());
221
//                }
222

  
223
            	if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " received request from "+message.getSender()+": " + req.getRequestHash());
224

  
225
            	processRequest(req,message.getSender());
226

  
227
		}
228

  
229
		while(replyHandler.hasMessage()){
230
            final Message message = replyHandler.pullMessage();
231

  
232

  
233
            	HeuristicReplyWithPublicActionsMessage re = (HeuristicReplyWithPublicActionsMessage)message.getContent().getData();
234

  
235
//            	if (message.getSender().equals(comm.getAddress())) {
236
//                	LOGGER.info(comm.getAddress() + "("+id+")" + " received reply from self("+message.getSender()+"): " + re.getRequestHash());
237
//                }
238

  
239
            	if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " received reply from "+message.getSender()+": " + re.getRequestHash());
240

  
241
            	processReply(re,message.getSender());
242
//            }
243
		}
244

  
245

  
246

  
247
//		LOGGER.info(domain.agent + "("+id+")" + "DONE: requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
248

  
249
		if(log)if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " process messages done");
250

  
251 228
		while(currentRequest.waitingFor() == 0 && localRequests.size() > 0){
252 229
			LocalHeuristicRequest lr = localRequests.pollLast();
253 230
//			LOGGER.info(comm.getAddress() + "("+id+") localRequests: " + localRequests.size());
......
308 285
			DataAccumulator.getAccumulator().heuristicReplyMessages ++;
309 286
			DataAccumulator.getAccumulator().totalBytes += re.getBytes();
310 287

  
311
			Message message = comm.createMessage(new Content(re));
312
			message.addReceiver(sender);
313
			comm.sendMessage(message);
288
			if(otherProtocol != null && sender.equals(comm.getAddress())){
289
				otherProtocol.receiveHeuristicReplyWithPublicActionsMessage(re, comm.getAddress());
290
			}else{
291
				protocol.sendHeuristicReplyWithPublicActionsMessage(re, sender);
292
			}
314 293
		}
315 294

  
316 295
	}
317 296

  
297
	
298

  
318 299
}
src/test/java/cz/agents/dimaptools/relaxed/TestLazilyDistributedFFHeuristic.java
28 28
		DistributedBestFirstSearch search = new DistributedBestFirstSearch(world);
29 29
//		AStar search = new AStar(problem);
30 30

  
31
		HeuristicInterface heuristic = new LazilyDistributedFFHeuristic(world.getProblem(),world.getCommunicator(),false);
32
		HeuristicInterface reqHeuristic = new LazilyDistributedFFHeuristic(world.getProblem(),world.getCommunicator(),true);
31
		LazilyDistributedFFHeuristic heuristic = new LazilyDistributedFFHeuristic(world,false);
32
		LazilyDistributedFFHeuristic reqHeuristic = new LazilyDistributedFFHeuristic(world,true);
33
		heuristic.setOtherProtocol(reqHeuristic.getProtocol());
34
		reqHeuristic.setOtherProtocol(heuristic.getProtocol());
33 35

  
34 36
		search.plan(new MapConfiguration("heuristic",heuristic,"requestHeuristic",reqHeuristic), searchCallback);
35 37
	}

Also available in: Unified diff