Revision 190:ec67a401a5a3

View differences:

src/main/java/cz/agents/dimaptools/search/AsyncDistributedBestFirstSearch.java
1
package cz.agents.dimaptools.search;
2

  
3
import gnu.trove.TIntHashSet;
4
import gnu.trove.TIntObjectHashMap;
5

  
6
import java.util.Arrays;
7
import java.util.HashSet;
8
import java.util.LinkedList;
9
import java.util.List;
10
import java.util.Set;
11
import java.util.concurrent.BlockingQueue;
12
import java.util.concurrent.PriorityBlockingQueue;
13

  
14
import org.apache.log4j.Logger;
15

  
16
import cz.agents.alite.communication.CommunicationPerformer;
17
import cz.agents.alite.communication.Communicator;
18
import cz.agents.alite.configurator.ConfigurationInterface;
19
import cz.agents.dimaptools.DIMAPWorldInterface;
20
import cz.agents.dimaptools.communication.message.PlanningFinishedMessage;
21
import cz.agents.dimaptools.communication.message.ReconstructPlanMessage;
22
import cz.agents.dimaptools.communication.message.StateMessage;
23
import cz.agents.dimaptools.communication.protocol.DistributedSearchProtocol;
24
import cz.agents.dimaptools.experiment.DataAccumulator;
25
import cz.agents.dimaptools.heuristic.HeuristicInterface;
26
import cz.agents.dimaptools.heuristic.HeuristicInterface.HeuristicComputedCallback;
27
import cz.agents.dimaptools.heuristic.HeuristicResult;
28
import cz.agents.dimaptools.heuristic.goalsat.GoalSatHeuristic;
29
import cz.agents.dimaptools.model.Action;
30
import cz.agents.dimaptools.model.Problem;
31

  
32
/**
33
 * Simple implementation of distributed Best-First Search with deferred heuristic evaluation.
34
 * config:
35
 * 	"heuristic" - implementation of HeuristicInterface used to guide the search
36
 * @author stolba
37
 *
38
 */
39
public class AsyncDistributedBestFirstSearch implements SearchInterface, Runnable {
40

  
41
    private static final Logger LOGGER = Logger.getLogger(AsyncDistributedBestFirstSearch.class);
42

  
43
    protected final Problem problem;
44
    protected final PriorityBlockingQueue<SearchState> open = new PriorityBlockingQueue<SearchState>();
45
    protected final TIntHashSet closed = new TIntHashSet();
46

  
47
    protected HeuristicInterface heuristic;
48
    protected HeuristicInterface requestHeuristic;
49
    protected SearchCallback planCallback;
50

  
51
    protected boolean recomputeHeuristicOnReceive = false;
52

  
53
    protected final Communicator comm;
54
    protected final CommunicationPerformer commPerformer;
55
    protected final TIntObjectHashMap sentStates = new TIntObjectHashMap();
56

  
57
    protected final DistributedSearchProtocol protocol;
58

  
59
    protected String bestReconstructedPlanBy = null;
60
    protected int bestReconstructedPlanCost = Integer.MAX_VALUE;
61

  
62
    protected long timeLimitMs = Long.MAX_VALUE;
63

  
64
    protected volatile boolean run = true;
65
    protected volatile boolean search = true;
66
    protected volatile boolean hfinished = true;
67

  
68
	protected int minH = Integer.MAX_VALUE;
69
	protected int maxG = 0;
70
	
71
	protected BlockingQueue<HeuristicRequest> statesToComputeH = new PriorityBlockingQueue<HeuristicRequest>();
72

  
73

  
74

  
75

  
76
    public AsyncDistributedBestFirstSearch(DIMAPWorldInterface world) {
77
        this(world,Long.MAX_VALUE);
78
    }
79

  
80
    public AsyncDistributedBestFirstSearch(DIMAPWorldInterface world,long timeLimitMs) {
81
        this.problem = world.getProblem();
82
        this.comm = world.getCommunicator();
83
        commPerformer = world.getCommPerformer();
84
        this.timeLimitMs = timeLimitMs;
85

  
86
        heuristic = new GoalSatHeuristic(problem.goalSuperState);
87
        requestHeuristic = heuristic;
88

  
89
        protocol = new DistributedSearchProtocol(comm, world.getAgentName(), world.getEncoder()) {
90

  
91
            @Override
92
            public void receiveStateMessage(StateMessage sm, String sender) {
93
                if (!closed.contains(sm.getHash())) {
94

  
95
                    if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + " receive state " + Arrays.toString(sm.getValues()));
96

  
97
                    addReceivedState(sm, sender);
98

  
99
                }
100
            }
101

  
102
            @Override
103
            public void receiveReconstructPlanMessage(ReconstructPlanMessage rpm) {
104
                SearchState state = (SearchState)sentStates.get(rpm.getLastStateHash());
105

  
106
//            	LOGGER.info(comm.getAddress() + " receive reconstruct msg " + state.hashCode() );
107

  
108
                reconstructPlan(state, rpm.getPlan(),rpm.getInitiatorID(),rpm.getSolutionCost());
109

  
110
//            	run = false;
111
                search = false;
112
            }
113

  
114
            @Override
115
            public void receivePlanningFinishedMessage(PlanningFinishedMessage msg) {
116

  
117
                LOGGER.warn(comm.getAddress() + " receive PLANNING_FINISHED!" );
118

  
119

  
120
                run = false;
121

  
122
            }
123
        };
124
        
125
        
126
        Thread heurThread = new Thread(this);
127
        heurThread.setName(world.getAgentName()+"heur");
128
        heurThread.start();
129

  
130

  
131
    }
132

  
133

  
134
    protected void addReceivedState(final StateMessage sm, String sender){
135
    	if(recomputeHeuristicOnReceive){ //should be probably done for projected heuristics
136
    		final SearchState newState = new SearchState(problem.initState.getDomain(),sm,sender);
137
    		
138
    		getHeuristic(newState, new HeuristicComputedCallback() {
139
				
140
				@Override
141
				public void heuristicComputed(HeuristicResult result) {
142
					newState.setHeuristics(Math.max(result.getValue(), sm.getH()));
143
					open.add(newState);
144
					hfinished = true;
145
				}
146
			});
147
    		
148
    	}else{
149
    		
150
    		open.add(new SearchState(problem.initState.getDomain(),sm,sender));
151
    		
152
    	}
153
    }
154

  
155
    
156
    protected void readConfiguration(ConfigurationInterface config){
157
    	if(config.containsKey("heuristic")){
158
            heuristic = (HeuristicInterface) config.getObject("heuristic");
159
        }
160

  
161
        if(config.containsKey("requestHeuristic")){
162
            requestHeuristic = (HeuristicInterface) config.getObject("requestHeuristic");
163
        }else{
164
            requestHeuristic = heuristic;
165
        }
166
        
167
        recomputeHeuristicOnReceive = config.getBoolean("recomputeHeuristicOnReceive", false);
168
    }
169

  
170

  
171
    @Override
172
    public void plan(ConfigurationInterface config, SearchCallback planCallback) {
173

  
174
    	readConfiguration(config);
175

  
176
        this.planCallback = planCallback;
177

  
178
        open.add(new SearchState(problem.initState));
179

  
180
        do {
181
            if(!open.isEmpty() && search) {
182
                final SearchState state = open.poll();
183
                
184
                if(state.getHeuristic() < minH){
185
	              	minH = state.getHeuristic();
186
	               	if(LOGGER.isInfoEnabled())LOGGER.info(problem.agent + ": Reached new minimal [" + state.getParentActionOwner() + "] /h/: " + minH);
187
	            }
188
	            if(state.getG() > maxG){
189
	               	maxG = state.getG();
190
	               	if(LOGGER.isInfoEnabled())LOGGER.info(problem.agent + ": Reached new maximal [" + state.getParentActionOwner() + "] /g/: " + maxG);
191
	            }
192

  
193
                if (state != null && !closed.contains(state.hashCode())) {
194

  
195
                    closed.add(state.hashCode());
196

  
197
                    if (solutionFound(state)){
198
                        if (state.wasExpandedByMe(problem.agent)) {
199
                            long time =  System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs;
200
                            LOGGER.warn(" by me ("+problem.agent+") - " + time);
201

  
202
                            List<String> plan = new LinkedList<String>();
203
                            reconstructPlan(state, plan, problem.agent, state.getG());
204

  
205
//	                    	return;
206
                            search = false;
207

  
208
                        }
209
//	                    else {
210
//	                    	LOGGER.warn(" not by me ("+problem.agent+")");
211
////	                    	planCallback.planFoundByOther();
212
//	                    }
213

  
214

  
215

  
216
                    }
217

  
218
//		        	heurCount ++;
219
//		        	LOGGER.info(comm.getAddress() + " HEUR++ " + heurCount);
220

  
221
                    getHeuristic(state, new HeuristicComputedCallback(){
222

  
223
                        @Override
224
                        public void heuristicComputed(HeuristicResult result) {
225
//	                    	heurCount --;
226
//	                    	LOGGER.info(comm.getAddress() + " HEUR-- " + heurCount + ", open:" + open.size());
227

  
228
                            if(result.getValue() >= HeuristicInterface.LARGE_HEURISTIC){
229
                                LOGGER.info(comm.getAddress() + " LARGE_HEURISTIC"+ ", open:" + open.size() + ", state: "+state);
230
                                return;
231
                            }
232
                            state.setHeuristics(result.getValue());
233

  
234
                            if (state.wasReachedByPublicAction()) {
235
                                sendState(state);
236
                            }
237

  
238
                            DataAccumulator.getAccumulator().expandedStates++;
239

  
240
                            Set<SearchState> expandedStates = expand(state);
241
                            open.addAll(expandedStates);
242
                            hfinished = true;
243
                        }
244
                    });
245

  
246
                }
247

  
248
                if(System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs > timeLimitMs){
249
                    run = false;
250
                    LOGGER.warn("TIMEOUT!");
251
                    planCallback.planNotFound();
252
                    break;
253
                }
254
            }
255

  
256
            //LOGGER.info(comm.getAddress() + " open:"+open.size());
257

  
258
            commPerformer.performReceive();
259

  
260
            // let other threads run
261
            Thread.yield();
262

  
263
        } while(run);
264

  
265
        commPerformer.performClose();
266
    }
267

  
268
    /**
269
     * Distributed reconstruction of the plan
270
     * @param state
271
     * @param cost
272
     * @param initiator
273
     * @param plan
274
     */
275
    protected void reconstructPlan(SearchState state, List<String> globalPlan, String initiator, int solutionCost){
276
//		System.out.println(comm.getAddress() + " reconstruct " + state.hashCode() + " - " + plan);
277

  
278
        if(bestReconstructedPlanBy == null){
279
            bestReconstructedPlanBy = initiator;
280
            bestReconstructedPlanCost = solutionCost;
281
        }else{
282
            if(bestReconstructedPlanCost < solutionCost || bestReconstructedPlanBy.compareTo(initiator) < 0){
283
                LOGGER.info(comm.getAddress() +  " plan("+bestReconstructedPlanCost+") already reconstructed by "+initiator+" stop reconstruction of plan("+solutionCost+")");
284
                return;
285
            }
286
        }
287

  
288
        List<String> plan = new LinkedList<String>();
289
        ParentState lastState = state.reconstructPlan(plan);
290

  
291
        planCallback.partialPlanReconstructed(plan,initiator,solutionCost);
292

  
293
        plan.addAll(globalPlan);
294
        if(lastState.getParentActionOwner() == null){
295
            LOGGER.info(comm.getAddress() + " plan found " + state.hashCode() + " - " + plan);
296

  
297
            LOGGER.warn(comm.getAddress() + " send PLANNING_FINISHED!" );
298
            protocol.sendPlanningFinishedMessage();
299
            run = false;
300

  
301
            planCallback.planFound(plan);
302

  
303

  
304
        }else{
305

  
306
//        	LOGGER.info(comm.getAddress() + " send reconstruct msg " + state.hashCode() + " to " + lastState.getParentActionOwner());
307
            protocol.sendReconstructPlanMessage(new ReconstructPlanMessage(plan,lastState.hashCode(),initiator,solutionCost), lastState.getParentActionOwner());
308
        }
309
    }
310

  
311
    /**
312
     * Send reached state if reached by public action
313
     * @param state
314
     */
315
    public void sendState(final SearchState state){
316

  
317
        if(sentStates.containsKey(state.hashCode()))return;
318
        sentStates.put(state.hashCode(), state);
319

  
320
        StateMessage msg = new StateMessage(state.getValues(), state.getG(), state.getHeuristic());
321

  
322
        DataAccumulator.getAccumulator().searchMessages ++;
323
        DataAccumulator.getAccumulator().totalBytes += msg.getBytes();
324

  
325

  
326
//		protocol.sendStateMessage(msg,CommunicationChannelBroadcast.BROADCAST_ADDRESS);
327

  
328
        //send only to relevant agents
329
        Set<String> sentTo = new HashSet<String>();
330
        for(Action a : problem.getProjectedActions()){
331
            if(!sentTo.contains(a.getOwner()) && a.isApplicableIn(state)){
332
                protocol.sendStateMessage(msg,a.getOwner());
333
                sentTo.add(a.getOwner());
334
            }
335
        }
336
    }
337

  
338
    protected Set<SearchState> expand(SearchState state) {
339

  
340
//		LOGGER.info("expanding state with h=" + state.getHeuristic() + ", g+h=" + (state.getG()+state.getHeuristic()));
341

  
342
        Set<SearchState> result = new HashSet<SearchState>();
343

  
344
        for (Action action : problem.getMyActions()) {
345
            if (action.isApplicableIn(state)) {
346
                result.add(state.transformBy(action));
347
            }
348
        }
349

  
350
        return result;
351
    }
352

  
353
    protected boolean solutionFound(SearchState state) {
354
        if (state.unifiesWith(problem.goalSuperState)) {
355
            LOGGER.info("SOLUTION of cost "+state.getG()+" FOUND[" + problem.agent + "]: " + Arrays.toString(state.getValues()));
356

  
357
            LOGGER.info("OPEN-SIZE[" + problem.agent + "]" + open.size());
358
            LOGGER.info("CLOSED-SIZE[" + problem.agent + "]" + closed.size());
359

  
360
            return true;
361
        }
362

  
363
        return false;
364
    }
365

  
366
	@Override
367
	public void run() {
368
		LOGGER.info("RUN heuristic computation in separate thread!");
369
		
370
		while(search){
371
			if(hfinished && !statesToComputeH.isEmpty()){
372
				HeuristicRequest request = statesToComputeH.poll();
373
				hfinished = false;
374
				heuristic.getHeuristic(request.state, request.callback);
375
			}
376
			
377
			heuristic.processMessages();
378
            requestHeuristic.processMessages();
379

  
380
            // let other threads run
381
            Thread.yield();
382
		}
383
	}
384
	
385
	public void getHeuristic(SearchState state,HeuristicComputedCallback callback){
386
		statesToComputeH.add(new HeuristicRequest(state, callback,statesToComputeH.size()+1));
387
	}
388
	
389
	private class HeuristicRequest implements Comparable<HeuristicRequest>{
390
		
391
		public HeuristicRequest(SearchState state,HeuristicComputedCallback callback,int priority) {
392
			super();
393
			this.state = state;
394
			this.callback = callback;
395
			this.priority = priority;
396
		}
397
		
398
		SearchState state;
399
		HeuristicComputedCallback callback;
400
		int priority;
401
		
402
		@Override
403
		public int compareTo(HeuristicRequest o) {
404
			return priority - o.priority;	//TODO: check
405
		}
406
	}
407

  
408
}
src/test/java/cz/agents/dimaptools/synchronous/TestAsynchronousDistributedAStar.java
1
package cz.agents.dimaptools.synchronous;
2

  
3
import org.junit.Test;
4

  
5
import cz.agents.alite.configurator.MapConfiguration;
6
import cz.agents.dimaptools.DIMAPWorldInterface;
7
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationReplyHeuristic;
8
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationRequestHeuristic;
9
import cz.agents.dimaptools.heuristic.relaxed.evaluator.FFEvaluator;
10
import cz.agents.dimaptools.search.AbstractDistributedAStarTest;
11
import cz.agents.dimaptools.search.AsyncDistributedBestFirstSearch;
12

  
13
public class TestAsynchronousDistributedAStar extends AbstractDistributedAStarTest {
14

  
15
	@Test
16
	public void test() {
17
//		testProblem("truck-crane-a2");
18
//		testProblem("logistics-a2");
19
//		testProblem("logistics-a4");
20
//		testProblem("deconfliction-a4");
21
//		testProblem("rovers-a4");
22
//		testProblem("sokoban-a1");
23
//		testProblem("sokoban-a2");
24
	}
25

  
26
	@Override
27
	public void runSearch(DIMAPWorldInterface world){
28
		AsyncDistributedBestFirstSearch search = new AsyncDistributedBestFirstSearch(world);
29

  
30
		RecursiveDistributedRelaxationRequestHeuristic req = new RecursiveDistributedRelaxationRequestHeuristic(world, new FFEvaluator(world.getProblem()),0);
31
		RecursiveDistributedRelaxationReplyHeuristic rep = new RecursiveDistributedRelaxationReplyHeuristic(world, new FFEvaluator(world.getProblem()),req.getRequestProtocol());
32
		req.setReplyProtocol(rep.getReplyProtocol());
33

  
34
		search.plan(new MapConfiguration("heuristic",req,"requestHeuristic",rep), searchCallback);
35
	}
36

  
37
}

Also available in: Unified diff