Revision 184:73ef92cdcb0e

View differences:

src/main/java/cz/agents/dimaptools/heuristic/relaxed/RecursiveDistributedRelaxationPersonalizedRequestHeuristic.java
1
package cz.agents.dimaptools.heuristic.relaxed;
2

  
3
import gnu.trove.TIntHashSet;
4
import gnu.trove.TIntObjectHashMap;
5

  
6
import java.util.HashMap;
7
import java.util.LinkedList;
8
import java.util.Map;
9

  
10
import org.apache.log4j.Logger;
11

  
12
import cz.agents.alite.communication.Communicator;
13
import cz.agents.dimaptools.DIMAPWorldInterface;
14
import cz.agents.dimaptools.communication.message.HeuristicReplyWithPublicActionsMessage;
15
import cz.agents.dimaptools.communication.message.HeuristicRequestMessage;
16
import cz.agents.dimaptools.communication.protocol.DistributedHeuristicReplyProtocol;
17
import cz.agents.dimaptools.communication.protocol.DistributedHeuristicRequestProtocol;
18
import cz.agents.dimaptools.experiment.DataAccumulator;
19
import cz.agents.dimaptools.experiment.Trace;
20
import cz.agents.dimaptools.heuristic.HeuristicInterface;
21
import cz.agents.dimaptools.heuristic.HeuristicResult;
22
import cz.agents.dimaptools.heuristic.relaxed.evaluator.EvaluatorInterface;
23
import cz.agents.dimaptools.model.Action;
24
import cz.agents.dimaptools.model.State;
25
import cz.agents.dimaptools.util.SharedProblemInfoProvider;
26

  
27
public class RecursiveDistributedRelaxationPersonalizedRequestHeuristic extends RelaxationHeuristic {
28

  
29
    private final Logger LOGGER = Logger.getLogger(RecursiveDistributedRelaxationPersonalizedRequestHeuristic.class);
30

  
31
    private final String id = "REQ";
32
    private final DistributedHeuristicRequestProtocol requestProtocol;
33
    private DistributedHeuristicReplyProtocol replyProtocol = null;
34

  
35
    private final Communicator comm;
36
    private final String agentName;
37
    private final int agentID;
38

  
39
    /**
40
     * Sent requests waiting for the reply
41
     */
42
    private TIntObjectHashMap requests = new TIntObjectHashMap();
43
    private HeuristicComputedCallback currentCallback;
44

  
45
    /**
46
     * Queue requests from the search which cannot be computed at the moment
47
     */
48
    private LinkedList<LocalHeuristicRequest> localRequests = new LinkedList<LocalHeuristicRequest>();
49

  
50
//	private int heuristicCounter = 0;
51

  
52
    private final int maxRecursionDepth;
53

  
54
    /**
55
     * Remember actions already requested
56
     */
57
    private TIntHashSet requestedActions = new TIntHashSet();
58
    
59
    
60
    private final Map<String,Boolean> shouldSendRequestsTo = new HashMap<>();
61

  
62

  
63

  
64

  
65
    public RecursiveDistributedRelaxationPersonalizedRequestHeuristic(DIMAPWorldInterface world,EvaluatorInterface evaluator, int requestThreshold) {
66
        this(world,evaluator,Integer.MAX_VALUE, requestThreshold);
67
    }
68

  
69
    public RecursiveDistributedRelaxationPersonalizedRequestHeuristic(DIMAPWorldInterface world,EvaluatorInterface evaluator,int maxRecursionDepth, int requestThreshold) {
70
        super(world.getProblem(),evaluator,true);
71
        
72
        comm = world.getCommunicator();
73
        agentName = world.getAgentName();
74
        agentID = world.getAgentID();
75
        
76
        SharedProblemInfoProvider provider = new SharedProblemInfoProvider(world,world.getNumberOfAgents());
77
        for(String agent : provider.getKnownAgents()){
78
			if(provider.getCoupling(agent) > requestThreshold){
79
				shouldSendRequestsTo.put(agent, false);
80
				if(LOGGER.isInfoEnabled())LOGGER.info("  " + world.getAgentName() + " will not send requests to " + agent + "(" + provider.getCoupling(agent) + ")");
81
			}else{
82
				shouldSendRequestsTo.put(agent, true);
83
				if(LOGGER.isInfoEnabled())LOGGER.info("  " + world.getAgentName() + " will send requests to " + agent + "(" + provider.getCoupling(agent) + ")");
84
			}
85
		}
86

  
87
        this.maxRecursionDepth = maxRecursionDepth > -1 ? maxRecursionDepth : Integer.MAX_VALUE;
88

  
89
        requestProtocol = new DistributedHeuristicRequestProtocol(
90
                world.getCommunicator(),
91
                world.getAgentName(),
92
                world.getEncoder()){
93

  
94
            @Override
95
            public void receiveHeuristicReplyWithPublicActionsMessage(HeuristicReplyWithPublicActionsMessage re, String sender) {
96
                if(LOGGER.isDebugEnabled())LOGGER.debug(requestProtocol.getAddress() + "("+id+")" + " handle reply from " + sender + ": " + re);
97
                processReply(re,sender);
98
            }
99

  
100
        };
101

  
102
    }
103

  
104
    public DistributedHeuristicRequestProtocol getRequestProtocol(){
105
        return requestProtocol;
106
    }
107

  
108
    public void setReplyProtocol(DistributedHeuristicReplyProtocol replyProtocol){
109
        this.replyProtocol = replyProtocol;
110
    }
111

  
112

  
113
    @Override
114
    public void getHeuristic(State state, HeuristicComputedCallback callback) {
115
        if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " get heuristic: " + domain.humanize(state.getValues()));
116

  
117
//		LOGGER.info(domain.agent + "("+id+")" + "Get H(" + (heuristicCounter ++) + "): requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
118

  
119
        //if waiting for some replies, queue the local request
120
        if(requests.size() > 0){
121
            localRequests.add(new LocalHeuristicRequest(state, callback));
122
//			LOGGER.info(agentName + "("+id+") localRequests: " + localRequests.size());
123
            return;
124
        }
125

  
126
        //build the EQ
127
//        LOGGER.info(agentName + "("+id+") compute h for: " + state.hashCode());
128

  
129
        currentCallback = callback;
130
        currentState = state;
131
        requestedActions.clear();
132

  
133
        buildGoalPropositions(problem.goalSuperState);
134
        setupExplorationQueue();
135
        setupExplorationQueueState(state);
136
        relaxedExploration();
137

  
138
        //if not waiting for replies, finish the heuristic
139
        if(requests.size() == 0){
140
//			LOGGER.info(domain.agent + "("+id+")" + " Computed H(" + (heuristicCounter --) + "): requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
141
            int totalCost = evaluator.getTotalCost(goalPropositions);
142
            HelpfulActions ha = totalCost < HeuristicInterface.LARGE_HEURISTIC ? evaluator.getHelpfulActions(state) : new HelpfulActions();
143
            callback.heuristicComputed(new HeuristicResult(totalCost,ha));
144
        }
145
    }
146

  
147
    /**
148
     * Override the default enqueue operation
149
     * @param p
150
     * @param cost
151
     * @param op
152
     **/
153
    public void enqueueIfNecessary(final Proposition p, int cost, final UnaryOperator op){
154
//		if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " enqueue if necessary ");
155

  
156

  
157
        //XXX: recursion depth is effectively never checked
158
        //final int recursionDepth = 0;
159

  
160
//		if(op != null)LOGGER.info(domain.agent + " ENQUEUE " + op.operatorsIndex + ", action:" + problem.getAction(op.actionHash));
161

  
162
        if(cost >= HeuristicInterface.LARGE_HEURISTIC){
163
            LOGGER.warn("IMPOSSIBLE!");
164
        }
165

  
166
        if(p.cost == -1 || p.cost > cost){
167

  
168
            //if operator is owned by other agent (maybe isProjection check should suffice)
169
            if(
170
            		op != null && 						//i.e. initial state
171
            		op.shouldRequest && 				//is projection and not pure
172
            		maxRecursionDepth > 0 &&  			//should send any requests at all
173
            		shouldSendRequestsTo.get(op.agent)	//should send requests to the action's owner
174
            		){
175
                if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " enqueue public operator: " + problem.getAction(op.actionHash));
176

  
177
                final RelaxationHeuristicRequest req = new RelaxationHeuristicRequest(agentID, new HeuristicComputedCallback() {
178

  
179
                    @Override
180
                    public void heuristicComputed(HeuristicResult result) {
181

  
182
                        //the request and all its sub-requests are finished - add the proposition to the EQ
183
                        if(result.getValue() < LARGE_HEURISTIC){
184
                            p.distance = result.getValue();
185
                            p.cost = result.getValue();
186
                            p.reachedBy = op;
187
                            explorationQueue.add(p);
188
                        }
189

  
190
                        //finish EQ with the new proposition
191
                        finishExplorationQueue();
192
                    }
193

  
194
                });
195

  
196
                req.waitForReply();
197
                Trace.it("increase", "'" + agentName, null, id, req.hashCode(), 1, currentState.hashCode(), req.waitingFor());
198

  
199
                sendRequest(op.operatorsIndex,cost,req,op,1);
200
            }else{
201
                //if the operator is public, consider it requested, so it is not requested when received from other agent
202
                if(op != null && problem.getAction(op.actionHash).isPublic()){
203
                    requestedActions.add(op.operatorsIndex);
204
                }
205

  
206
                //enqueue normally
207
                p.cost = cost;
208
                p.distance = cost;
209
                p.reachedBy = op;
210
                explorationQueue.add(p);
211
            }
212
        }
213
    }
214

  
215

  
216

  
217

  
218
    private void sendRequest(int action, int localCost, RelaxationHeuristicRequest req, UnaryOperator op, int recursionDepth) {
219
        if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " send request: " + problem.getAction(op.actionHash));
220

  
221
//		if(recursionDepth > 1){
222
//			if(LOGGER.isInfoEnabled())LOGGER.info(domain.agent + "("+id+")" + " sendRequest("+req.hashCode()+") to "+op.agent+",depth="+recursionDepth);
223
//		}
224

  
225
        //action was requested
226
        requestedActions.add(action);
227

  
228
        //store the request and wait for replies
229
        requests.put(req.hashCode(), req);
230

  
231
//		LOGGER.info(agentName + "("+id+")" + " SEND REQUEST("+req.hashCode()+"):"+requests.size() + ", rd:"+recursionDepth + ", current state hash:"+currentState.hashCode());
232
        Trace.it("send", "'" + agentName, "'" + op.agent, id, req.hashCode(), recursionDepth, currentState.hashCode(), req.waitingFor());
233

  
234
        String agent = op.agent;
235

  
236
        int[] reqOps = {op.operatorsIndex};
237

  
238
        HeuristicRequestMessage reqm = new HeuristicRequestMessage(req.hashCode(), currentState.getValues(), reqOps,recursionDepth);
239

  
240
        //request may be for self, or other agent
241
        if(replyProtocol!=null && agent.equals(domain.agent)){
242
            replyProtocol.receiveHeuristicRequestMessage(reqm, domain.agent);
243
        }else{
244
            if(LOGGER.isDebugEnabled())LOGGER.debug(agentName + "("+id+")" + " send request " + reqm.humanize(problem.getDomain()));
245

  
246
            DataAccumulator.getAccumulator().heuristicRequestMessages ++;
247
            DataAccumulator.getAccumulator().totalBytes += reqm.getBytes();
248

  
249
            requestProtocol.sendHeuristicRequestMessage(reqm, agent);
250
        }
251
    }
252

  
253

  
254

  
255

  
256

  
257
    /**
258
     * This method used to process messages, but now is used only to periodically check
259
     * if there are any local requests to process
260
     */
261
    @Override
262
    public void processMessages() {
263

  
264
        while(requests.size() == 0 && localRequests.size() > 0){
265
            LocalHeuristicRequest lr = localRequests.pollLast();
266

  
267
            getHeuristic(lr.state, lr.callback);
268
        }
269

  
270
    }
271

  
272

  
273

  
274
    /**
275
     * When a reply is received, it is either directly submitted to the waiting request, or first requests to determine costs of used public actions are sent.
276
     * @param re
277
     * @param sender
278
     */
279
    public void processReply(final HeuristicReplyWithPublicActionsMessage re, String sender){
280

  
281
        if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " process reply: " + re);
282

  
283
//		LOGGER.info(domain.agent + "("+id+")" + " receiveReply("+re.getRequestHash()+")");
284

  
285
        //obtain the waiting request
286
        final RelaxationHeuristicRequest req = (RelaxationHeuristicRequest) requests.get(re.getRequestHash());
287

  
288
//		requests.remove(re.getRequestHash());
289

  
290
//    	LOGGER.info(agentName + "("+id+")" + " RECEVIED REPLY("+re.getRequestHash()+"):"+requests.size() + " with "+re.usedPublicActionIDs.length+ " public actions");
291

  
292
        if(re.getRecursionDepth()>DataAccumulator.getAccumulator().maxRecursionDepth)DataAccumulator.getAccumulator().maxRecursionDepth=re.getRecursionDepth();
293
//        LOGGER.info("RECURSION DEPTH:" + re.getRecursionDepth());
294

  
295
        if(req==null){
296
            LOGGER.error(agentName + "("+id+")" + " request "+re.getRequestHash()+" does not exist!");
297
            Trace.it("error-DNE", "'" + agentName, null, id, re.getRequestHash(), re.getRecursionDepth(), currentState.hashCode());
298
            return;
299
        }
300
        if(currentState.hashCode() != re.getStateHash()){
301
            LOGGER.error(agentName + "("+id+")" + " reply state hash ("+re.getStateHash()+") does not equal current state hash ("+currentState.hashCode()+")!");
302
        }
303
        //TODO: when receiving reply, the cost of an operator should probably be updated,
304
        //      so if it is used again (and no request is sent), the updated cost is used
305
        //      but it may be more complicated
306

  
307
        //If there are no public actions to be requested, submit the reply
308
        if(re.usedPublicActionIDs.length > 0 && re.getRecursionDepth() < maxRecursionDepth){
309
//    		LOGGER.info(domain.agent + "("+id+")" + " req "+re.getRequestHash()+" waiting for "+req.waitingFor());
310

  
311

  
312
            //else, send requests for all the used actions
313
            for(final int opIndex : re.usedPublicActionIDs){
314
                final UnaryOperator op = operators.get(opIndex);
315
                Action a = problem.getAction(op.actionHash);
316

  
317
                //XXX: prospectively improve naming -> "registerPendingReply"?
318
                req.waitForReply();
319
                Trace.it("increase", "'" + agentName, null, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor());
320

  
321
                //if the action is from the agent who sent the reply, or if the action was already requested, ignore it
322
                if(!(a.isProjection() && a.getOwner().equals(sender)) && !requestedActions.contains(opIndex)){
323

  
324
//	    			LOGGER.info(domain.agent + "("+id+")" + " prepare inner request for op-"+opIndex);
325

  
326
                    //prepare the request
327
                    RelaxationHeuristicRequest newReq = new RelaxationHeuristicRequest(agentID, new HeuristicComputedCallback() {
328

  
329
                        @Override
330
                        public void heuristicComputed(HeuristicResult result) {
331
                            //submit the reply
332

  
333
                            //XXX: prospectively improve naming -> "processRequests"?
334
                            req.receiveReply(re.heuristicValue + result.getValue(),requests);
335

  
336
                            Trace.it("receive1", "'" + agentName, "'" + op.agent, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor(), re.heuristicValue + result.getValue());
337
                            Trace.it("decrease", "'" + agentName, null, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor());
338
                        }
339

  
340
                    });
341

  
342
                    newReq.waitForReply();
343
                    Trace.it("increase", "'" + agentName, null, id, newReq.hashCode(), re.getRecursionDepth() + 1, currentState.hashCode(), newReq.waitingFor());
344

  
345
                    //TODO: check the cost
346
                    sendRequest(op.operatorsIndex,op.cost,newReq,op,re.getRecursionDepth()+1);
347

  
348
                }else{
349
                    //XXX: prospectively improve naming -> "processRequests", "processReply"?
350
                    req.receiveReply(0,requests);
351
                    Trace.it("decrease", "'" + agentName, null, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor());
352
                }
353

  
354
            }
355
        }
356

  
357

  
358
        Trace.it("receive0", "'" + agentName, "'" + sender, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor()-1, re.heuristicValue);
359

  
360
        //XXX: prospectively improve naming -> "processRequests"?
361
        req.receiveReply(re.heuristicValue,requests);
362
        Trace.it("decrease", "'" + agentName, null, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor());
363
    }
364

  
365

  
366
    /**
367
     * Finish the exploration queue if a new proposition was added thanks to a received reply
368
     */
369
    // XXX: this strongly resembles RelaxationHeuristic.relaxedExploration, why it cannot use one universal code
370
    //      (generally evaluation coming from different heuristics of from local heuristic should be treated equally)
371
    public void finishExplorationQueue(){
372
        /*
373
         * Check this! We need to add propositions which were skipped, but also update costs
374
         * of propositions which were added, but can now be achieved another way
375
         */
376

  
377
        int unsolvedGoals = 0;
378
        for(Proposition g : goalPropositions){
379
            if(g.cost == -1)++unsolvedGoals;
380
        }
381

  
382
        //continue with the exploration
383
        while(!explorationQueue.isEmpty()){
384
            Proposition p = explorationQueue.poll();
385

  
386
            if(p.cost < p.distance) continue;
387

  
388
            if(p.isGoal && --unsolvedGoals <= 0){ //cheaper but incomplete test
389
                boolean all_goals = true;
390
                for(Proposition g : goalPropositions){ //complete test
391
                    if(g.cost == -1){
392
                        all_goals = false;
393
                        ++unsolvedGoals;
394
                    }
395
                }
396
                if(all_goals)break;
397
            }
398

  
399
            //compute cost
400
            evaluator.evaluateOperators(p.preconditionOf, p.cost);
401

  
402
            //trigger operators
403
            for(UnaryOperator op : p.preconditionOf){
404
                --op.unsatisfied_preconditions;
405
                if(op.unsatisfied_preconditions <= 0){
406
                    //use the original enqueue - do not send requests
407
                    super.enqueueIfNecessary(op.effect,op.cost,op);
408
                }
409
            }
410

  
411

  
412
        }
413

  
414
//		LOGGER.info(domain.agent + "("+id+")" + "finish eq done, pending requests:"+requests.size());
415

  
416
        //if there are no waiting requests, finish the heuristic
417
        if(requests.size() == 0){
418
//    		LOGGER.info(domain.agent + "("+id+")" + "Computed H(" + (heuristicCounter --) + "): requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
419

  
420
            int totalCost = evaluator.getTotalCost(goalPropositions);
421

  
422
            HelpfulActions ha = totalCost < HeuristicInterface.LARGE_HEURISTIC ? evaluator.getHelpfulActions(currentState) : new HelpfulActions();
423
            currentCallback.heuristicComputed(new HeuristicResult(totalCost,ha));
424
        }else{
425
//    		LOGGER.warn(domain.agent + "("+id+") - waiting for "+requests.size()+"!");
426
        }
427
    }
428

  
429

  
430

  
431

  
432

  
433
}
src/main/java/cz/agents/dimaptools/util/SharedProblemInfoProvider.java
4 4
import java.util.Map;
5 5
import java.util.Set;
6 6

  
7
import org.apache.log4j.Logger;
8

  
9 7
import cz.agents.dimaptools.DIMAPWorldInterface;
10 8
import cz.agents.dimaptools.communication.message.SharedProblemInfoMessage;
11 9
import cz.agents.dimaptools.communication.protocol.DistributedProblemInfoSharingProtocol;
......
13 11

  
14 12
public class SharedProblemInfoProvider {
15 13
	
16
	 private static final Logger LOGGER = Logger.getLogger(SharedProblemInfoProvider.class);
14
//	 private static final Logger LOGGER = Logger.getLogger(SharedProblemInfoProvider.class);
17 15
	
18 16
	private DistributedProblemInfoSharingProtocol protocol;
19 17
	
......
28 26
			
29 27
			@Override
30 28
			public void process(SharedProblemInfoMessage msg, String sender) {
31
				LOGGER.info(world.getAgentName() + " received send info from " + sender);
29
//				LOGGER.info(world.getAgentName() + " received send info from " + sender);
32 30
				couplings.put(sender, msg.getCouplingEstimate());
33 31
				--waitingFor;
34 32
			}
35 33
		};
36 34
		
37
		LOGGER.info(world.getAgentName() + " send info");
35
//		LOGGER.info(world.getAgentName() + " send info");
38 36
		protocol.sendSharedProblemInfoMessage(new SharedProblemInfoMessage(computeMyCoupling(world.getProblem())));
39 37
		
40 38
		while(waitingFor > 0){
src/test/java/cz/agents/dimaptools/relaxed/TestRecursiveDistributedPersonalizedHeuristic.java
1
package cz.agents.dimaptools.relaxed;
2

  
3
import org.junit.Test;
4

  
5
import cz.agents.alite.configurator.MapConfiguration;
6
import cz.agents.dimaptools.DIMAPWorldInterface;
7
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationPersonalizedRequestHeuristic;
8
import cz.agents.dimaptools.heuristic.relaxed.RecursiveDistributedRelaxationReplyHeuristic;
9
import cz.agents.dimaptools.heuristic.relaxed.evaluator.AddEvaluator;
10
import cz.agents.dimaptools.search.AbstractDistributedAStarTest;
11
import cz.agents.dimaptools.search.DistributedBestFirstSearch;
12

  
13
public class TestRecursiveDistributedPersonalizedHeuristic extends AbstractDistributedAStarTest {
14

  
15
	@Test
16
	public void test() {
17
//		testProblem("truck-crane-a2");
18
//		testProblem("truck-crane-factory-a3");
19
//		testProblem("logistics-a2");
20
		testProblem("logistics-a4");
21
//		testProblem("deconfliction-a4");
22
//		testProblem("rovers-a4");
23
//		testProblem("sokoban-a1");
24
//		testProblem("sokoban-a2");
25
	}
26

  
27
	@Override
28
	public void runSearch(DIMAPWorldInterface world){
29
		DistributedBestFirstSearch search = new DistributedBestFirstSearch(world);
30
//		AStar search = new AStar(problem);
31

  
32
		RecursiveDistributedRelaxationPersonalizedRequestHeuristic req = new RecursiveDistributedRelaxationPersonalizedRequestHeuristic(world, new AddEvaluator(world.getProblem()),60);
33
		RecursiveDistributedRelaxationReplyHeuristic rep = new RecursiveDistributedRelaxationReplyHeuristic(world, new AddEvaluator(world.getProblem()),req.getRequestProtocol());
34
		req.setReplyProtocol(rep.getReplyProtocol());
35

  
36
		search.plan(new MapConfiguration("heuristic",req,"requestHeuristic",rep), searchCallback);
37
	}
38

  
39
}

Also available in: Unified diff