Revision 173:853af8d96cfe

View differences:

src/main/java/cz/agents/dimaptools/search/SyncDistributedBestFirstSearch.java
1
package cz.agents.dimaptools.search;
2

  
3
import gnu.trove.TIntHashSet;
4
import gnu.trove.TIntObjectHashMap;
5

  
6
import java.util.Arrays;
7
import java.util.HashSet;
8
import java.util.LinkedList;
9
import java.util.List;
10
import java.util.Set;
11
import java.util.concurrent.PriorityBlockingQueue;
12

  
13
import org.apache.log4j.Logger;
14

  
15
import cz.agents.alite.communication.CommunicationPerformer;
16
import cz.agents.alite.communication.Communicator;
17
import cz.agents.alite.configurator.ConfigurationInterface;
18
import cz.agents.dimaptools.DIMAPWorldInterface;
19
import cz.agents.dimaptools.communication.message.PlanningFinishedMessage;
20
import cz.agents.dimaptools.communication.message.ReconstructPlanMessage;
21
import cz.agents.dimaptools.communication.message.StateMessage;
22
import cz.agents.dimaptools.communication.protocol.DistributedSearchProtocol;
23
import cz.agents.dimaptools.experiment.DataAccumulator;
24
import cz.agents.dimaptools.heuristic.HeuristicInterface;
25
import cz.agents.dimaptools.heuristic.HeuristicInterface.HeuristicComputedCallback;
26
import cz.agents.dimaptools.heuristic.HeuristicResult;
27
import cz.agents.dimaptools.heuristic.goalsat.GoalSatHeuristic;
28
import cz.agents.dimaptools.model.Action;
29
import cz.agents.dimaptools.model.Problem;
30

  
31
/**
32
 * Simple implementation of distributed Best-First Search with deferred heuristic evaluation.
33
 * config:
34
 * 	"heuristic" - implementation of HeuristicInterface used to guide the search
35
 * @author stolba
36
 *
37
 */
38
public class SyncDistributedBestFirstSearch implements SearchInterface {
39

  
40
    private static final Logger LOGGER = Logger.getLogger(SyncDistributedBestFirstSearch.class);
41

  
42
    protected final Problem problem;
43
    private final PriorityBlockingQueue<SearchState> open = new PriorityBlockingQueue<SearchState>();
44
    protected final TIntHashSet closed = new TIntHashSet();
45

  
46
    private HeuristicInterface heuristic;
47
    private HeuristicInterface requestHeuristic;
48
    protected SearchCallback planCallback;
49

  
50

  
51
    protected final Communicator comm;
52
    protected final CommunicationPerformer commPerformer;
53
    private final TIntObjectHashMap sentStates = new TIntObjectHashMap();
54

  
55
    private final DistributedSearchProtocol protocol;
56

  
57
    private String bestReconstructedPlanBy = null;
58
    private int bestReconstructedPlanCost;
59

  
60
    protected long timeLimitMs = Long.MAX_VALUE;
61

  
62
    protected volatile boolean run = true;
63
    protected volatile boolean search = true;
64

  
65
	private boolean heuristicComputed;
66

  
67

  
68

  
69

  
70
    public SyncDistributedBestFirstSearch(DIMAPWorldInterface world) {
71
        this(world,Long.MAX_VALUE);
72
    }
73

  
74
    public SyncDistributedBestFirstSearch(DIMAPWorldInterface world,long timeLimitMs) {
75
        this.problem = world.getProblem();
76
        this.comm = world.getCommunicator();
77
        commPerformer = world.getCommPerformer();
78
        this.timeLimitMs = timeLimitMs;
79

  
80
        heuristic = new GoalSatHeuristic(problem.goalSuperState);
81
        requestHeuristic = heuristic;
82

  
83
        protocol = new DistributedSearchProtocol(comm, world.getAgentName(), world.getEncoder()) {
84

  
85
            @Override
86
            public void receiveStateMessage(StateMessage sm, String sender) {
87
                if (!closed.contains(sm.getHash())) {
88

  
89
                    if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + " receive state " + Arrays.toString(sm.getValues()));
90

  
91
                    addReceivedState(sm, sender);
92

  
93
                }
94
            }
95

  
96
            @Override
97
            public void receiveReconstructPlanMessage(ReconstructPlanMessage rpm) {
98
                SearchState state = (SearchState)sentStates.get(rpm.getLastStateHash());
99

  
100
//            	LOGGER.info(comm.getAddress() + " receive reconstruct msg " + state.hashCode() );
101

  
102
                reconstructPlan(state, rpm.getPlan(),rpm.getInitiatorID(),rpm.getSolutionCost());
103

  
104
//            	run = false;
105
                search = false;
106
            }
107

  
108
            @Override
109
            public void receivePlanningFinishedMessage(PlanningFinishedMessage msg) {
110

  
111
                LOGGER.warn(comm.getAddress() + " receive PLANNING_FINISHED!" );
112

  
113

  
114
                run = false;
115

  
116
            }
117
        };
118

  
119

  
120
    }
121

  
122

  
123
    protected void addReceivedState(StateMessage sm, String sender){
124
        open.add(new SearchState(problem.initState.getDomain(),sm,sender));
125
    }
126

  
127

  
128

  
129
    @Override
130
    public void plan(ConfigurationInterface config, SearchCallback planCallback) {
131

  
132
        if(config.containsKey("heuristic")){
133
            heuristic = (HeuristicInterface) config.getObject("heuristic");
134
        }
135

  
136
        if(config.containsKey("requestHeuristic")){
137
            requestHeuristic = (HeuristicInterface) config.getObject("requestHeuristic");
138
        }else{
139
            requestHeuristic = heuristic;
140
        }
141

  
142

  
143
        this.planCallback = planCallback;
144

  
145
        open.add(new SearchState(problem.initState));
146

  
147
        do {
148
            if(!open.isEmpty() && search) {
149
                final SearchState state = open.poll();
150

  
151
                if (state != null && !closed.contains(state.hashCode())) {
152

  
153
                    closed.add(state.hashCode());
154

  
155
                    if (solutionFound(state)){
156
                        if (state.wasExpandedByMe(problem.agent)) {
157
                            long time =  System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs;
158
                            LOGGER.warn(" by me ("+problem.agent+") - " + time);
159

  
160
                            List<String> plan = new LinkedList<String>();
161
                            reconstructPlan(state, plan, problem.agent, state.getG());
162

  
163
//	                    	return;
164
                            search = false;
165

  
166
                        }
167
//	                    else {
168
//	                    	LOGGER.warn(" not by me ("+problem.agent+")");
169
////	                    	planCallback.planFoundByOther();
170
//	                    }
171

  
172

  
173

  
174
                    }
175

  
176
//		        	heurCount ++;
177
//		        	LOGGER.info(comm.getAddress() + " HEUR++ " + heurCount);
178

  
179
                    heuristicComputed = false;
180
                    
181
                    heuristic.getHeuristic(state, new HeuristicComputedCallback(){
182

  
183
                        @Override
184
                        public void heuristicComputed(HeuristicResult result) {
185
//	                    	heurCount --;
186
//	                    	LOGGER.info(comm.getAddress() + " HEUR-- " + heurCount + ", open:" + open.size());
187

  
188
                            if(result.getValue() >= HeuristicInterface.LARGE_HEURISTIC){
189
                                LOGGER.info(comm.getAddress() + " LARGE_HEURISTIC"+ ", open:" + open.size() + ", state: "+state);
190
                                return;
191
                            }
192
                            state.setHeuristics(result.getValue());
193

  
194
                            if (state.wasReachedByPublicAction()) {
195
                                sendState(state);
196
                            }
197

  
198
                            DataAccumulator.getAccumulator().expandedStates++;
199

  
200
                            Set<SearchState> expandedStates = expand(state);
201
                            open.addAll(expandedStates);
202
                            
203
                            heuristicComputed = true;
204
                        }
205
                    });
206

  
207
                }
208

  
209
                if(System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs > timeLimitMs){
210
                    run = false;
211
                    LOGGER.warn("TIMEOUT!");
212
                    planCallback.planNotFound();
213
                    break;
214
                }
215
                
216
//	        LOGGER.info("waiting for heuristic...");
217
	        while(!heuristicComputed){
218
				commPerformer.performReceive();
219
				
220
				if(search){
221
					heuristic.processMessages();
222
					requestHeuristic.processMessages();
223
				}
224
			}
225
//	        LOGGER.info("... computed");
226
            }
227

  
228
            //LOGGER.info(comm.getAddress() + " open:"+open.size());
229

  
230
            commPerformer.performReceive();
231

  
232
            if(search){
233
                heuristic.processMessages();
234
                requestHeuristic.processMessages();
235
            }
236

  
237
            // let other threads run
238
            Thread.yield();
239

  
240
        } while(run);
241

  
242
        commPerformer.performClose();
243
    }
244

  
245
    /**
246
     * Distributed reconstruction of the plan
247
     * @param state
248
     * @param cost
249
     * @param initiator
250
     * @param plan
251
     */
252
    protected void reconstructPlan(SearchState state, List<String> globalPlan, String initiator, int solutionCost){
253
//		System.out.println(comm.getAddress() + " reconstruct " + state.hashCode() + " - " + plan);
254

  
255
        if(bestReconstructedPlanBy == null){
256
            bestReconstructedPlanBy = initiator;
257
            bestReconstructedPlanCost = solutionCost;
258
        }else{
259
            if(bestReconstructedPlanCost < solutionCost || bestReconstructedPlanBy.compareTo(initiator) < 0){
260
                LOGGER.info(comm.getAddress() +  " plan("+bestReconstructedPlanCost+") already reconstructed by "+initiator+" stop reconstruction of plan("+solutionCost+")");
261
                return;
262
            }
263
        }
264

  
265
        List<String> plan = new LinkedList<String>();
266
        ParentState lastState = state.reconstructPlan(plan);
267

  
268
        planCallback.partialPlanReconstructed(plan,initiator,solutionCost);
269

  
270
        plan.addAll(globalPlan);
271
        if(lastState.getParentActionOwner() == null){
272
            LOGGER.info(comm.getAddress() + " plan found " + state.hashCode() + " - " + plan);
273

  
274
            LOGGER.warn(comm.getAddress() + " send PLANNING_FINISHED!" );
275
            protocol.sendPlanningFinishedMessage();
276
            run = false;
277

  
278
            planCallback.planFound(plan);
279

  
280

  
281
        }else{
282

  
283
//        	LOGGER.info(comm.getAddress() + " send reconstruct msg " + state.hashCode() + " to " + lastState.getParentActionOwner());
284
            protocol.sendReconstructPlanMessage(new ReconstructPlanMessage(plan,lastState.hashCode(),initiator,solutionCost), lastState.getParentActionOwner());
285
        }
286
    }
287

  
288
    /**
289
     * Send reached state if reached by public action
290
     * @param state
291
     */
292
    public void sendState(final SearchState state){
293

  
294
        if(sentStates.containsKey(state.hashCode()))return;
295
        sentStates.put(state.hashCode(), state);
296

  
297
        StateMessage msg = new StateMessage(state.getValues(), state.getG(), state.getHeuristic());
298

  
299
        DataAccumulator.getAccumulator().searchMessages ++;
300
        DataAccumulator.getAccumulator().totalBytes += msg.getBytes();
301

  
302

  
303
//		protocol.sendStateMessage(msg,CommunicationChannelBroadcast.BROADCAST_ADDRESS);
304

  
305
        //send only to relevant agents
306
        Set<String> sentTo = new HashSet<String>();
307
        for(Action a : problem.getProjectedActions()){
308
            if(!sentTo.contains(a.getOwner()) && a.isApplicableIn(state)){
309
                protocol.sendStateMessage(msg,a.getOwner());
310
                sentTo.add(a.getOwner());
311
            }
312
        }
313
    }
314

  
315
    protected Set<SearchState> expand(SearchState state) {
316

  
317
//		LOGGER.info("expanding state with h=" + state.getHeuristic() + ", g+h=" + (state.getG()+state.getHeuristic()));
318

  
319
        Set<SearchState> result = new HashSet<SearchState>();
320

  
321
        for (Action action : problem.getMyActions()) {
322
            if (action.isApplicableIn(state)) {
323
                result.add(state.transformBy(action));
324
            }
325
        }
326

  
327
        return result;
328
    }
329

  
330
    protected boolean solutionFound(SearchState state) {
331
        if (state.unifiesWith(problem.goalSuperState)) {
332
            LOGGER.info("SOLUTION of cost "+state.getG()+" FOUND[" + problem.agent + "]: " + Arrays.toString(state.getValues()));
333

  
334
            LOGGER.info("OPEN-SIZE[" + problem.agent + "]" + open.size());
335
            LOGGER.info("CLOSED-SIZE[" + problem.agent + "]" + closed.size());
336

  
337
            return true;
338
        }
339

  
340
        return false;
341
    }
342

  
343
}
src/main/java/cz/agents/dimaptools/search/SynchronousDistributedAStar.java
1
package cz.agents.dimaptools.search;
2

  
3
import java.util.LinkedList;
4
import java.util.List;
5
import java.util.Set;
6
import java.util.concurrent.PriorityBlockingQueue;
7

  
8
import org.apache.log4j.Logger;
9

  
10
import cz.agents.alite.configurator.ConfigurationInterface;
11
import cz.agents.dimaptools.DIMAPWorldInterface;
12
import cz.agents.dimaptools.experiment.DataAccumulator;
13
import cz.agents.dimaptools.heuristic.HeuristicInterface;
14
import cz.agents.dimaptools.heuristic.HeuristicInterface.HeuristicComputedCallback;
15
import cz.agents.dimaptools.heuristic.HeuristicResult;
16

  
17
/**
18
 * Simple implementation of distributed A* (or Best-First Search depending on used heuristic) with deferred heuristic evaluation.
19
 * config:
20
 * 	"heuristic" - implementation of HeuristicInterface used to guide the search
21
 * @author stolba
22
 *
23
 */
24
public class SynchronousDistributedAStar extends DistributedBestFirstSearch {
25

  
26
	private static final Logger LOGGER = Logger.getLogger(SynchronousDistributedAStar.class);
27

  
28
	
29
    private final PriorityBlockingQueue<SearchState> open = new PriorityBlockingQueue<SearchState>();
30
    
31

  
32
    private HeuristicInterface heuristic;
33
    private HeuristicInterface requestHeuristic;
34

  
35
    private boolean heuristicComputed = false;
36

  
37

  
38
    public SynchronousDistributedAStar(DIMAPWorldInterface world) {
39
    	this(world,Long.MAX_VALUE);
40
    }
41

  
42
	public SynchronousDistributedAStar(DIMAPWorldInterface world,long timeLimitMs) {
43
		super(world, timeLimitMs);
44

  
45
	}
46

  
47

  
48

  
49
	@Override
50
	public void plan(ConfigurationInterface config, SearchCallback planCallback) {
51

  
52
		if(config.containsKey("heuristic")){
53
			heuristic = (HeuristicInterface) config.getObject("heuristic");
54
		}
55

  
56
		if(config.containsKey("requestHeuristic")){
57
			requestHeuristic = (HeuristicInterface) config.getObject("requestHeuristic");
58
		}else{
59
			requestHeuristic = heuristic;
60
		}
61

  
62

  
63
		this.planCallback = planCallback;
64

  
65
		open.add(new SearchState(problem.initState));
66

  
67
		do{
68
			if(!open.isEmpty() && search){
69
				final SearchState state;
70

  
71
				
72
				state = open.poll();
73

  
74
		        if (state != null && !closed.contains(state.hashCode())) {
75

  
76
		        	closed.add(state.hashCode());
77

  
78
		        	if (solutionFound(state)){
79
	                    if (state.wasExpandedByMe(problem.agent)) {
80
	                    	long time =  System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs;
81
	                    	LOGGER.warn(" by me ("+problem.agent+") - " + time);
82

  
83
	                    	List<String> plan = new LinkedList<String>();
84
	                    	reconstructPlan(state, plan, problem.agent, state.getG());
85

  
86
//	                    	return;
87
	                    	search = false;
88

  
89
	                    }
90

  
91

  
92

  
93
	                }
94

  
95
		        	heuristicComputed = false;
96
		        	
97
		        	heuristic.getHeuristic(state, new HeuristicComputedCallback(){
98

  
99
	                    @Override
100
	                    public void heuristicComputed(HeuristicResult result) {
101

  
102
	                    	if(result.getValue() >= HeuristicInterface.LARGE_HEURISTIC){
103
	                    		LOGGER.info(comm.getAddress() + " LARGE_HEURISTIC"+ ", open:" + open.size());
104
	                    		return;
105
	                    	}
106
	                        state.setHeuristics(result.getValue());
107

  
108
	                        if (state.wasReachedByPublicAction()) {
109
	                            sendState(state);
110
	                        }
111

  
112
	                        DataAccumulator.getAccumulator().expandedStates ++;
113

  
114
	                        Set<SearchState> expandedStates = expand(state);
115
	                        open.addAll(expandedStates);
116
	                        
117
	                        heuristicComputed = true;
118
	                    }
119
	                });
120

  
121
		        }
122

  
123
		        if(System.currentTimeMillis() - DataAccumulator.getAccumulator().startTimeMs > timeLimitMs){
124
					run = false;
125
					LOGGER.warn("TIMEOUT!");
126
	            	planCallback.planNotFound();
127
	            	break;
128
				}
129
			}
130
			
131
//			LOGGER.info("waiting for heuristic...");
132
			
133
			commPerformer.performReceive();
134
			
135
			if(search){
136
				heuristic.processMessages();
137
				requestHeuristic.processMessages();
138
			}
139

  
140
			while(!heuristicComputed){
141
				commPerformer.performReceive();
142
				
143
				if(search){
144
					heuristic.processMessages();
145
					requestHeuristic.processMessages();
146
				}
147
			}
148

  
149
//			LOGGER.info("... computed");
150

  
151
		}while(run);
152
		
153
		commPerformer.performClose();
154
	}
155

  
156
	
157

  
158
}
src/test/java/cz/agents/dimaptools/synchronous/TestSynchronousDistributedAStar.java
8 8
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationRequestHeuristic;
9 9
import cz.agents.dimaptools.heuristic.relaxed.evaluator.FFEvaluator;
10 10
import cz.agents.dimaptools.search.AbstractDistributedAStarTest;
11
import cz.agents.dimaptools.search.SynchronousDistributedAStar;
11
import cz.agents.dimaptools.search.SyncDistributedBestFirstSearch;
12 12

  
13 13
public class TestSynchronousDistributedAStar extends AbstractDistributedAStarTest {
14 14

  
15 15
	@Test
16 16
	public void test() {
17 17
//		testProblem("truck-crane-a2");
18
		testProblem("logistics-a2");
18
//		testProblem("logistics-a2");
19 19
//		testProblem("logistics-a4");
20
//		testProblem("deconfliction-a4");
20
		testProblem("deconfliction-a4");
21 21
//		testProblem("rovers-a4");
22 22
//		testProblem("sokoban-a1");
23 23
//		testProblem("sokoban-a2");
......
25 25

  
26 26
	@Override
27 27
	public void runSearch(DIMAPWorldInterface world){
28
		SynchronousDistributedAStar search = new SynchronousDistributedAStar(world);
28
		SyncDistributedBestFirstSearch search = new SyncDistributedBestFirstSearch(world);
29 29

  
30 30
		RecursiveDistributedRelaxationRequestHeuristic req = new RecursiveDistributedRelaxationRequestHeuristic(world, new FFEvaluator(world.getProblem()));
31 31
		RecursiveDistributedRelaxationReplyHeuristic rep = new RecursiveDistributedRelaxationReplyHeuristic(world, new FFEvaluator(world.getProblem()),req.getRequestProtocol());

Also available in: Unified diff