Revision 193:fd2a0fe6eb32

View differences:

src/main/java/cz/agents/dimaptools/heuristic/relaxed/RecursiveDistributedRelaxationRequestHeuristic.java
98 98
    public void setReplyProtocol(DistributedHeuristicReplyProtocol replyProtocol){
99 99
        this.replyProtocol = replyProtocol;
100 100
    }
101
    
102
    public boolean isComputing(){
103
    	return requests.size() > 0;
104
    }
105
    
106
    public boolean hasWaitingLocalRequests(){
107
    	return localRequests.size() > 0;
108
    }
101 109

  
102 110

  
103 111
    @Override
src/main/java/cz/agents/dimaptools/search/GlobalLocalDistributedBestFirstSearch.java
1
package cz.agents.dimaptools.search;
2

  
3
import gnu.trove.TIntHashSet;
4
import gnu.trove.TIntObjectHashMap;
5

  
6
import java.util.Arrays;
7
import java.util.Comparator;
8
import java.util.HashSet;
9
import java.util.LinkedList;
10
import java.util.List;
11
import java.util.Set;
12
import java.util.concurrent.PriorityBlockingQueue;
13

  
14
import org.apache.log4j.Logger;
15

  
16
import cz.agents.alite.communication.CommunicationPerformer;
17
import cz.agents.alite.communication.Communicator;
18
import cz.agents.alite.configurator.ConfigurationInterface;
19
import cz.agents.dimaptools.DIMAPWorldInterface;
20
import cz.agents.dimaptools.communication.message.PlanningFinishedMessage;
21
import cz.agents.dimaptools.communication.message.ReconstructPlanMessage;
22
import cz.agents.dimaptools.communication.message.StateMessage;
23
import cz.agents.dimaptools.communication.protocol.DistributedSearchProtocol;
24
import cz.agents.dimaptools.experiment.DataAccumulator;
25
import cz.agents.dimaptools.heuristic.HeuristicInterface;
26
import cz.agents.dimaptools.heuristic.HeuristicInterface.HeuristicComputedCallback;
27
import cz.agents.dimaptools.heuristic.HeuristicResult;
28
import cz.agents.dimaptools.heuristic.goalsat.GoalSatHeuristic;
29
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationReplyHeuristic;
30
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationRequestHeuristic;
31
import cz.agents.dimaptools.heuristic.relaxed.RelaxationHeuristic;
32
import cz.agents.dimaptools.model.Action;
33
import cz.agents.dimaptools.model.Problem;
34

  
35
/**
36
 * Simple implementation of distributed Best-First Search with deferred heuristic evaluation.
37
 * config:
38
 * 	"heuristic" - implementation of HeuristicInterface used to guide the search
39
 * @author stolba
40
 *
41
 */
42
public class GlobalLocalDistributedBestFirstSearch implements SearchInterface {
43

  
44
    private static final Logger LOGGER = Logger.getLogger(GlobalLocalDistributedBestFirstSearch.class);
45

  
46
    protected final Problem problem;
47
    protected final PriorityBlockingQueue<SearchState> openGlobal;
48
    protected final PriorityBlockingQueue<SearchState> openLocal;
49
    protected final TIntHashSet closed = new TIntHashSet();
50

  
51
    protected RelaxationHeuristic localHeuristic;
52
    protected RecursiveDistributedRelaxationRequestHeuristic requestGlobalHeuristic;
53
    protected RecursiveDistributedRelaxationReplyHeuristic replyGlobalHeuristic;
54
    
55
    protected SearchCallback planCallback;
56

  
57
    protected boolean recomputeHeuristicOnReceive = false;
58

  
59
    protected final Communicator comm;
60
    protected final CommunicationPerformer commPerformer;
61
    protected final TIntObjectHashMap sentStates = new TIntObjectHashMap();
62

  
63
    protected final DistributedSearchProtocol protocol;
64

  
65
    protected String bestReconstructedPlanBy = null;
66
    protected int bestReconstructedPlanCost = Integer.MAX_VALUE;
67

  
68
    protected long timeLimitMs = Long.MAX_VALUE;
69

  
70
    protected volatile boolean run = true;
71
    protected volatile boolean search = true;
72

  
73
	protected int minH = Integer.MAX_VALUE;
74
	protected int maxG = 0;
75

  
76

  
77

  
78

  
79
    public GlobalLocalDistributedBestFirstSearch(DIMAPWorldInterface world) {
80
        this(world,Long.MAX_VALUE);
81
    }
82

  
83
    public GlobalLocalDistributedBestFirstSearch(DIMAPWorldInterface world,long timeLimitMs) {
84
        this.problem = world.getProblem();
85
        this.comm = world.getCommunicator();
86
        commPerformer = world.getCommPerformer();
87
        this.timeLimitMs = timeLimitMs;
88

  
89
        
90
        openLocal = new PriorityBlockingQueue<SearchState>(10000,new Comparator<SearchState>(){
91

  
92
			@Override
93
			public int compare(SearchState arg0, SearchState arg1) {
94
				return arg0.getHeuristic() - arg1.getHeuristic();
95
			}
96
        	
97
        });
98
        
99
        openGlobal = new PriorityBlockingQueue<SearchState>(10000,new Comparator<SearchState>(){
100

  
101
			@Override
102
			public int compare(SearchState arg0, SearchState arg1) {
103
				return arg0.getHeuristic() - arg1.getHeuristic();
104
			}
105
        	
106
        });
107

  
108
        protocol = new DistributedSearchProtocol(comm, world.getAgentName(), world.getEncoder()) {
109

  
110
            @Override
111
            public void receiveStateMessage(StateMessage sm, String sender) {
112
                if (!closed.contains(sm.getHash())) {
113

  
114
                    if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + " receive state " + Arrays.toString(sm.getValues()));
115

  
116
                    addReceivedState(sm, sender);
117

  
118
                }
119
            }
120

  
121
            @Override
122
            public void receiveReconstructPlanMessage(ReconstructPlanMessage rpm) {
123
                SearchState state = (SearchState)sentStates.get(rpm.getLastStateHash());
124

  
125
//            	LOGGER.info(comm.getAddress() + " receive reconstruct msg " + state.hashCode() );
126

  
127
                reconstructPlan(state, rpm.getPlan(),rpm.getInitiatorID(),rpm.getSolutionCost());
128

  
129
//            	run = false;
130
                search = false;
131
            }
132

  
133
            @Override
134
            public void receivePlanningFinishedMessage(PlanningFinishedMessage msg) {
135

  
136
                LOGGER.warn(comm.getAddress() + " receive PLANNING_FINISHED!" );
137

  
138

  
139
                run = false;
140

  
141
            }
142
        };
143

  
144

  
145
    }
146

  
147

  
148
    protected void addReceivedState(final StateMessage sm, String sender){
149
    	
150
    	final SearchState newState = new SearchState(problem.initState.getDomain(),sm,sender);
151
    		
152
    	localHeuristic.getHeuristic(newState, new HeuristicComputedCallback() {
153
				
154
				@Override
155
				public void heuristicComputed(HeuristicResult result) {
156
					newState.setHeuristics(Math.max(result.getValue(), sm.getH()));
157
					openLocal.add(newState);
158
				}
159
		});
160
    		
161
    	openGlobal.add(new SearchState(problem.initState.getDomain(),sm,sender));
162
    	
163
    	
164
    }
165

  
166
    
167
    protected void readConfiguration(ConfigurationInterface config){
168
    	if(config.containsKey("localHeuristic")){
169
            localHeuristic =  (RelaxationHeuristic) config.getObject("localHeuristic");
170
        }
171
    	
172
    	if(config.containsKey("requestGlobalHeuristic")){
173
            requestGlobalHeuristic =  (RecursiveDistributedRelaxationRequestHeuristic) config.getObject("requestGlobalHeuristic");
174
        }
175

  
176
        if(config.containsKey("replyGlobalHeuristic")){
177
            replyGlobalHeuristic = (RecursiveDistributedRelaxationReplyHeuristic) config.getObject("replyGlobalHeuristic");
178
        }
179
        
180
        recomputeHeuristicOnReceive = config.getBoolean("recomputeHeuristicOnReceive", false);
181
    }
182

  
183

  
184
    @Override
185
    public void plan(ConfigurationInterface config, SearchCallback planCallback) {
186

  
187
    	readConfiguration(config);
188

  
189
        this.planCallback = planCallback;
190

  
191
        openGlobal.add(new SearchState(problem.initState));
192

  
193
        do {
194
        	if(search){
195
        		final SearchState state;
196
        		
197
        		if(!openGlobal.isEmpty()){
198
        			state = openGlobal.poll();
199
        			
200
        			if(state.getHeuristic() < minH){
201
    	              	minH = state.getHeuristic();
202
    	               	if(LOGGER.isInfoEnabled())LOGGER.info(problem.agent + ": Reached new minimal [" + state.getParentActionOwner() + "] /h/: " + minH);
203
    	            }
204
    	            if(state.getG() > maxG){
205
    	               	maxG = state.getG();
206
    	               	if(LOGGER.isInfoEnabled())LOGGER.info(problem.agent + ": Reached new maximal [" + state.getParentActionOwner() + "] /g/: " + maxG);
207
    	            }
208
    	            
209
    	            if (state != null && !closed.contains(state.hashCode())) {
210

  
211
                        closed.add(state.hashCode());
212

  
213
                        if (solutionFound(state)){
214
                            if (state.wasExpandedByMe(problem.agent)) {
215
                                long time =  System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs;
216
                                LOGGER.warn(" by me ("+problem.agent+") - " + time);
217

  
218
                                List<String> plan = new LinkedList<String>();
219
                                reconstructPlan(state, plan, problem.agent, state.getG());
220

  
221
                                search = false;
222
                            }
223
                        }
224
                        
225
                        if(!requestGlobalHeuristic.hasWaitingLocalRequests()){
226
        	            	requestGlobalHeuristic.getHeuristic(state, new HeuristicComputedCallback(){
227

  
228
                                @Override
229
                                public void heuristicComputed(HeuristicResult result) {
230
                                    if(result.getValue() >= HeuristicInterface.LARGE_HEURISTIC){
231
                                        return;
232
                                    }
233
                                    
234
                                    state.setHeuristics(result.getValue());
235

  
236
                                    if (state.wasReachedByPublicAction()) {
237
                                        sendState(state);
238
                                    }
239

  
240
                                    DataAccumulator.getAccumulator().expandedStates++;
241

  
242
                                    Set<SearchState> expandedStates = expand(state);
243
                                    openGlobal.addAll(expandedStates);
244
                                }
245
                            });
246
        	            }
247
            		}
248
    	            
249
    	            
250
        		}else{
251
        			state = null;
252
        		}
253
        		
254
        		while(openGlobal.isEmpty() || requestGlobalHeuristic.hasWaitingLocalRequests()){
255
        			final SearchState localState;
256
        			
257
        			if(state != null){
258
        				localState = state;
259
        			}else{
260
        				localState = openLocal.poll();
261
        			}
262
        			
263
        			if (state != null && !closed.contains(localState.hashCode())) {
264

  
265
                        closed.add(localState.hashCode());
266

  
267
                        if (solutionFound(localState)){
268
                            if (localState.wasExpandedByMe(problem.agent)) {
269
                                long time =  System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs;
270
                                LOGGER.warn(" by me ("+problem.agent+") - " + time);
271

  
272
                                List<String> plan = new LinkedList<String>();
273
                                reconstructPlan(localState, plan, problem.agent, localState.getG());
274

  
275
                                search = false;
276
                            }
277
                        }
278
        			
279
		            	localHeuristic.getHeuristic(localState, new HeuristicComputedCallback(){
280
	
281
	                        @Override
282
	                        public void heuristicComputed(HeuristicResult result) {
283
	                            if(result.getValue() >= HeuristicInterface.LARGE_HEURISTIC){
284
	                                return;
285
	                            }
286
	                            
287
	                            localState.setHeuristics(result.getValue());
288
	
289
	                            if (localState.wasReachedByPublicAction()) {
290
	                                sendState(localState);
291
	                            }
292
	
293
	                            DataAccumulator.getAccumulator().expandedStates++;
294
	
295
	                            Set<SearchState> expandedStates = expand(localState);
296
	                            openLocal.addAll(expandedStates);
297
	                        }
298
	                    });
299
        			}
300
        			
301
        			if(System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs > timeLimitMs){
302
                        run = false;
303
                        LOGGER.warn("TIMEOUT!");
304
                        planCallback.planNotFound();
305
                        break;
306
                    }
307
        			
308
        			commPerformer.performReceive();
309

  
310
                    if(search){
311
                        requestGlobalHeuristic.processMessages();
312
                        replyGlobalHeuristic.processMessages();
313
                    }
314

  
315
                    // let other threads run
316
                    Thread.yield();
317
	            }
318
        		
319
        		
320
        	}
321
        	
322
        	
323
        	if(System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs > timeLimitMs){
324
                run = false;
325
                LOGGER.warn("TIMEOUT!");
326
                planCallback.planNotFound();
327
                break;
328
            }
329

  
330
            //LOGGER.info(comm.getAddress() + " open:"+open.size());
331

  
332
            commPerformer.performReceive();
333

  
334
            if(search){
335
                requestGlobalHeuristic.processMessages();
336
                replyGlobalHeuristic.processMessages();
337
            }
338

  
339
            // let other threads run
340
            Thread.yield();
341

  
342
        } while(run);
343

  
344
        commPerformer.performClose();
345
    }
346

  
347
    /**
348
     * Distributed reconstruction of the plan
349
     * @param state
350
     * @param cost
351
     * @param initiator
352
     * @param plan
353
     */
354
    protected void reconstructPlan(SearchState state, List<String> globalPlan, String initiator, int solutionCost){
355
//		System.out.println(comm.getAddress() + " reconstruct " + state.hashCode() + " - " + plan);
356

  
357
        if(bestReconstructedPlanBy == null){
358
            bestReconstructedPlanBy = initiator;
359
            bestReconstructedPlanCost = solutionCost;
360
        }else{
361
            if(bestReconstructedPlanCost < solutionCost || bestReconstructedPlanBy.compareTo(initiator) < 0){
362
                LOGGER.info(comm.getAddress() +  " plan("+bestReconstructedPlanCost+") already reconstructed by "+initiator+" stop reconstruction of plan("+solutionCost+")");
363
                return;
364
            }
365
        }
366

  
367
        List<String> plan = new LinkedList<String>();
368
        ParentState lastState = state.reconstructPlan(plan);
369

  
370
        planCallback.partialPlanReconstructed(plan,initiator,solutionCost);
371

  
372
        plan.addAll(globalPlan);
373
        if(lastState.getParentActionOwner() == null){
374
            LOGGER.info(comm.getAddress() + " plan found " + state.hashCode() + " - " + plan);
375

  
376
            LOGGER.warn(comm.getAddress() + " send PLANNING_FINISHED!" );
377
            protocol.sendPlanningFinishedMessage();
378
            run = false;
379

  
380
            planCallback.planFound(plan);
381

  
382

  
383
        }else{
384

  
385
//        	LOGGER.info(comm.getAddress() + " send reconstruct msg " + state.hashCode() + " to " + lastState.getParentActionOwner());
386
            protocol.sendReconstructPlanMessage(new ReconstructPlanMessage(plan,lastState.hashCode(),initiator,solutionCost), lastState.getParentActionOwner());
387
        }
388
    }
389

  
390
    /**
391
     * Send reached state if reached by public action
392
     * @param state
393
     */
394
    public void sendState(final SearchState state){
395

  
396
        if(sentStates.containsKey(state.hashCode()))return;
397
        sentStates.put(state.hashCode(), state);
398

  
399
        StateMessage msg = new StateMessage(state.getValues(), state.getG(), state.getHeuristic());
400

  
401
        DataAccumulator.getAccumulator().searchMessages ++;
402
        DataAccumulator.getAccumulator().totalBytes += msg.getBytes();
403

  
404

  
405
//		protocol.sendStateMessage(msg,CommunicationChannelBroadcast.BROADCAST_ADDRESS);
406

  
407
        //send only to relevant agents
408
        Set<String> sentTo = new HashSet<String>();
409
        for(Action a : problem.getProjectedActions()){
410
            if(!sentTo.contains(a.getOwner()) && a.isApplicableIn(state)){
411
                protocol.sendStateMessage(msg,a.getOwner());
412
                sentTo.add(a.getOwner());
413
            }
414
        }
415
    }
416

  
417
    protected Set<SearchState> expand(SearchState state) {
418

  
419
//		LOGGER.info("expanding state with h=" + state.getHeuristic() + ", g+h=" + (state.getG()+state.getHeuristic()));
420

  
421
        Set<SearchState> result = new HashSet<SearchState>();
422

  
423
        for (Action action : problem.getMyActions()) {
424
            if (action.isApplicableIn(state)) {
425
                result.add(state.transformBy(action));
426
            }
427
        }
428

  
429
        return result;
430
    }
431

  
432
    protected boolean solutionFound(SearchState state) {
433
        if (state.unifiesWith(problem.goalSuperState)) {
434
            LOGGER.info("SOLUTION of cost "+state.getG()+" FOUND[" + problem.agent + "]: " + Arrays.toString(state.getValues()));
435

  
436
            LOGGER.info("LOCAL-OPEN-SIZE[" + problem.agent + "]" + openLocal.size());
437
            LOGGER.info("GLOBAL-OPEN-SIZE[" + problem.agent + "]" + openGlobal.size());
438
            LOGGER.info("CLOSED-SIZE[" + problem.agent + "]" + closed.size());
439

  
440
            return true;
441
        }
442

  
443
        return false;
444
    }
445

  
446
}
src/test/java/cz/agents/dimaptools/synchronous/TestGLDistributedAStar.java
1
package cz.agents.dimaptools.synchronous;
2

  
3
import org.junit.Test;
4

  
5
import cz.agents.alite.configurator.MapConfiguration;
6
import cz.agents.dimaptools.DIMAPWorldInterface;
7
import cz.agents.dimaptools.heuristic.HeuristicInterface;
8
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationReplyHeuristic;
9
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationRequestHeuristic;
10
import cz.agents.dimaptools.heuristic.relaxed.RelaxationHeuristic;
11
import cz.agents.dimaptools.heuristic.relaxed.evaluator.AddEvaluator;
12
import cz.agents.dimaptools.search.AbstractDistributedAStarTest;
13
import cz.agents.dimaptools.search.GlobalLocalDistributedBestFirstSearch;
14

  
15
public class TestGLDistributedAStar extends AbstractDistributedAStarTest {
16

  
17
	@Test
18
	public void test() {
19
//		testProblem("truck-crane-a2");
20
//		testProblem("logistics-a2");
21
//		testProblem("logistics-a4");
22
//		testProblem("deconfliction-a4");
23
//		testProblem("rovers-a4");
24
//		testProblem("sokoban-a1");
25
//		testProblem("sokoban-a2");
26
	}
27

  
28
	@Override
29
	public void runSearch(DIMAPWorldInterface world){
30
		GlobalLocalDistributedBestFirstSearch search = new GlobalLocalDistributedBestFirstSearch(world);
31

  
32
		HeuristicInterface local = new RelaxationHeuristic(world.getProblem(),new AddEvaluator(world.getProblem()));
33
		RecursiveDistributedRelaxationRequestHeuristic req = new RecursiveDistributedRelaxationRequestHeuristic(world, new AddEvaluator(world.getProblem()));
34
		RecursiveDistributedRelaxationReplyHeuristic rep = new RecursiveDistributedRelaxationReplyHeuristic(world, new AddEvaluator(world.getProblem()),req.getRequestProtocol());
35
		req.setReplyProtocol(rep.getReplyProtocol());
36

  
37
		search.plan(new MapConfiguration("localHeuristic",local,"requestGlobalHeuristic",req,"replyGlobalHeuristic",rep), searchCallback);
38
	}
39

  
40
}

Also available in: Unified diff