Revision 185:1939d21ac5cd

View differences:

src/main/java/cz/agents/dimaptools/heuristic/InitializableHeuristic.java
1
package cz.agents.dimaptools.heuristic;
2

  
3
public interface InitializableHeuristic {
4
	
5
	public void initialize();
6

  
7
}
src/main/java/cz/agents/dimaptools/heuristic/relaxed/RecursiveDistributedRelaxationPersonalizedRequestHeuristic.java
1 1
package cz.agents.dimaptools.heuristic.relaxed;
2 2

  
3
import gnu.trove.TIntHashSet;
4
import gnu.trove.TIntObjectHashMap;
5

  
6 3
import java.util.HashMap;
7
import java.util.LinkedList;
8 4
import java.util.Map;
9 5

  
10 6
import org.apache.log4j.Logger;
11 7

  
12
import cz.agents.alite.communication.Communicator;
13 8
import cz.agents.dimaptools.DIMAPWorldInterface;
14 9
import cz.agents.dimaptools.communication.message.HeuristicReplyWithPublicActionsMessage;
15
import cz.agents.dimaptools.communication.message.HeuristicRequestMessage;
16
import cz.agents.dimaptools.communication.protocol.DistributedHeuristicReplyProtocol;
17
import cz.agents.dimaptools.communication.protocol.DistributedHeuristicRequestProtocol;
18 10
import cz.agents.dimaptools.experiment.DataAccumulator;
19 11
import cz.agents.dimaptools.experiment.Trace;
20 12
import cz.agents.dimaptools.heuristic.HeuristicInterface;
21 13
import cz.agents.dimaptools.heuristic.HeuristicResult;
14
import cz.agents.dimaptools.heuristic.InitializableHeuristic;
22 15
import cz.agents.dimaptools.heuristic.relaxed.evaluator.EvaluatorInterface;
23 16
import cz.agents.dimaptools.model.Action;
24
import cz.agents.dimaptools.model.State;
25 17
import cz.agents.dimaptools.util.SharedProblemInfoProvider;
26 18

  
27
public class RecursiveDistributedRelaxationPersonalizedRequestHeuristic extends RelaxationHeuristic {
19
public class RecursiveDistributedRelaxationPersonalizedRequestHeuristic extends RecursiveDistributedRelaxationRequestHeuristic implements InitializableHeuristic {
28 20

  
29 21
    private final Logger LOGGER = Logger.getLogger(RecursiveDistributedRelaxationPersonalizedRequestHeuristic.class);
30 22

  
31
    private final String id = "REQ";
32
    private final DistributedHeuristicRequestProtocol requestProtocol;
33
    private DistributedHeuristicReplyProtocol replyProtocol = null;
34

  
35
    private final Communicator comm;
36
    private final String agentName;
37
    private final int agentID;
38

  
39
    /**
40
     * Sent requests waiting for the reply
41
     */
42
    private TIntObjectHashMap requests = new TIntObjectHashMap();
43
    private HeuristicComputedCallback currentCallback;
44

  
45
    /**
46
     * Queue requests from the search which cannot be computed at the moment
47
     */
48
    private LinkedList<LocalHeuristicRequest> localRequests = new LinkedList<LocalHeuristicRequest>();
49

  
50
//	private int heuristicCounter = 0;
51

  
52
    private final int maxRecursionDepth;
53

  
54
    /**
55
     * Remember actions already requested
56
     */
57
    private TIntHashSet requestedActions = new TIntHashSet();
58 23
    
59
    
24
    private final SharedProblemInfoProvider provider;
25
    private final int requestThreshold;
60 26
    private final Map<String,Boolean> shouldSendRequestsTo = new HashMap<>();
61 27

  
62 28

  
......
67 33
    }
68 34

  
69 35
    public RecursiveDistributedRelaxationPersonalizedRequestHeuristic(DIMAPWorldInterface world,EvaluatorInterface evaluator,int maxRecursionDepth, int requestThreshold) {
70
        super(world.getProblem(),evaluator,true);
36
        super(world,evaluator,maxRecursionDepth);
71 37
        
72
        comm = world.getCommunicator();
73
        agentName = world.getAgentName();
74
        agentID = world.getAgentID();
38
        shouldSendRequestsTo.put(world.getAgentName(), true);	//necessary for self-requests
75 39
        
76
        SharedProblemInfoProvider provider = new SharedProblemInfoProvider(world,world.getNumberOfAgents());
40
        provider = new SharedProblemInfoProvider(world,world.getNumberOfAgents());
41
        this.requestThreshold=requestThreshold;
42
        
43

  
44
    }
45
    
46
    public void shareKnowledge(){
47
    	provider.sendInfoAndWait();
77 48
        for(String agent : provider.getKnownAgents()){
78 49
			if(provider.getCoupling(agent) > requestThreshold){
79 50
				shouldSendRequestsTo.put(agent, false);
80
				if(LOGGER.isInfoEnabled())LOGGER.info("  " + world.getAgentName() + " will not send requests to " + agent + "(" + provider.getCoupling(agent) + ")");
51
				if(LOGGER.isInfoEnabled())LOGGER.info("  " + agentName + " will not send requests to " + agent + "(" + provider.getCoupling(agent) + ")");
81 52
			}else{
82 53
				shouldSendRequestsTo.put(agent, true);
83
				if(LOGGER.isInfoEnabled())LOGGER.info("  " + world.getAgentName() + " will send requests to " + agent + "(" + provider.getCoupling(agent) + ")");
54
				if(LOGGER.isInfoEnabled())LOGGER.info("  " + agentName + " will send requests to " + agent + "(" + provider.getCoupling(agent) + ")");
84 55
			}
85 56
		}
86

  
87
        this.maxRecursionDepth = maxRecursionDepth > -1 ? maxRecursionDepth : Integer.MAX_VALUE;
88

  
89
        requestProtocol = new DistributedHeuristicRequestProtocol(
90
                world.getCommunicator(),
91
                world.getAgentName(),
92
                world.getEncoder()){
93

  
94
            @Override
95
            public void receiveHeuristicReplyWithPublicActionsMessage(HeuristicReplyWithPublicActionsMessage re, String sender) {
96
                if(LOGGER.isDebugEnabled())LOGGER.debug(requestProtocol.getAddress() + "("+id+")" + " handle reply from " + sender + ": " + re);
97
                processReply(re,sender);
98
            }
99

  
100
        };
101

  
102 57
    }
103 58

  
104
    public DistributedHeuristicRequestProtocol getRequestProtocol(){
105
        return requestProtocol;
106
    }
107 59

  
108
    public void setReplyProtocol(DistributedHeuristicReplyProtocol replyProtocol){
109
        this.replyProtocol = replyProtocol;
110
    }
111

  
112

  
113
    @Override
114
    public void getHeuristic(State state, HeuristicComputedCallback callback) {
115
        if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " get heuristic: " + domain.humanize(state.getValues()));
116

  
117
//		LOGGER.info(domain.agent + "("+id+")" + "Get H(" + (heuristicCounter ++) + "): requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
118

  
119
        //if waiting for some replies, queue the local request
120
        if(requests.size() > 0){
121
            localRequests.add(new LocalHeuristicRequest(state, callback));
122
//			LOGGER.info(agentName + "("+id+") localRequests: " + localRequests.size());
123
            return;
124
        }
125

  
126
        //build the EQ
127
//        LOGGER.info(agentName + "("+id+") compute h for: " + state.hashCode());
128

  
129
        currentCallback = callback;
130
        currentState = state;
131
        requestedActions.clear();
132

  
133
        buildGoalPropositions(problem.goalSuperState);
134
        setupExplorationQueue();
135
        setupExplorationQueueState(state);
136
        relaxedExploration();
137

  
138
        //if not waiting for replies, finish the heuristic
139
        if(requests.size() == 0){
140
//			LOGGER.info(domain.agent + "("+id+")" + " Computed H(" + (heuristicCounter --) + "): requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
141
            int totalCost = evaluator.getTotalCost(goalPropositions);
142
            HelpfulActions ha = totalCost < HeuristicInterface.LARGE_HEURISTIC ? evaluator.getHelpfulActions(state) : new HelpfulActions();
143
            callback.heuristicComputed(new HeuristicResult(totalCost,ha));
144
        }
145
    }
146 60

  
147 61
    /**
148 62
     * Override the default enqueue operation
......
215 129

  
216 130

  
217 131

  
218
    private void sendRequest(int action, int localCost, RelaxationHeuristicRequest req, UnaryOperator op, int recursionDepth) {
219
        if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " send request: " + problem.getAction(op.actionHash));
220

  
221
//		if(recursionDepth > 1){
222
//			if(LOGGER.isInfoEnabled())LOGGER.info(domain.agent + "("+id+")" + " sendRequest("+req.hashCode()+") to "+op.agent+",depth="+recursionDepth);
223
//		}
224

  
225
        //action was requested
226
        requestedActions.add(action);
227

  
228
        //store the request and wait for replies
229
        requests.put(req.hashCode(), req);
230

  
231
//		LOGGER.info(agentName + "("+id+")" + " SEND REQUEST("+req.hashCode()+"):"+requests.size() + ", rd:"+recursionDepth + ", current state hash:"+currentState.hashCode());
232
        Trace.it("send", "'" + agentName, "'" + op.agent, id, req.hashCode(), recursionDepth, currentState.hashCode(), req.waitingFor());
233

  
234
        String agent = op.agent;
235

  
236
        int[] reqOps = {op.operatorsIndex};
237

  
238
        HeuristicRequestMessage reqm = new HeuristicRequestMessage(req.hashCode(), currentState.getValues(), reqOps,recursionDepth);
239

  
240
        //request may be for self, or other agent
241
        if(replyProtocol!=null && agent.equals(domain.agent)){
242
            replyProtocol.receiveHeuristicRequestMessage(reqm, domain.agent);
243
        }else{
244
            if(LOGGER.isDebugEnabled())LOGGER.debug(agentName + "("+id+")" + " send request " + reqm.humanize(problem.getDomain()));
245

  
246
            DataAccumulator.getAccumulator().heuristicRequestMessages ++;
247
            DataAccumulator.getAccumulator().totalBytes += reqm.getBytes();
248

  
249
            requestProtocol.sendHeuristicRequestMessage(reqm, agent);
250
        }
251
    }
252

  
253

  
254

  
255

  
256

  
257
    /**
258
     * This method used to process messages, but now is used only to periodically check
259
     * if there are any local requests to process
260
     */
261
    @Override
262
    public void processMessages() {
263

  
264
        while(requests.size() == 0 && localRequests.size() > 0){
265
            LocalHeuristicRequest lr = localRequests.pollLast();
266

  
267
            getHeuristic(lr.state, lr.callback);
268
        }
269

  
270
    }
271

  
272

  
273

  
274 132
    /**
275 133
     * When a reply is received, it is either directly submitted to the waiting request, or first requests to determine costs of used public actions are sent.
276 134
     * @param re
......
318 176
                req.waitForReply();
319 177
                Trace.it("increase", "'" + agentName, null, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor());
320 178

  
179
                
321 180
                //if the action is from the agent who sent the reply, or if the action was already requested, ignore it
322
                if(!(a.isProjection() && a.getOwner().equals(sender)) && !requestedActions.contains(opIndex)){
181
                if(!(a.isProjection() && a.getOwner().equals(sender)) && !requestedActions.contains(opIndex) && shouldSendRequestsTo.get(a.getOwner())){
323 182

  
324 183
//	    			LOGGER.info(domain.agent + "("+id+")" + " prepare inner request for op-"+opIndex);
325 184

  
......
362 221
        Trace.it("decrease", "'" + agentName, null, id, req.hashCode(), re.getRecursionDepth(), currentState.hashCode(), req.waitingFor());
363 222
    }
364 223

  
224
	@Override
225
	public void initialize() {
226
		shareKnowledge();
227
	}
365 228

  
366
    /**
367
     * Finish the exploration queue if a new proposition was added thanks to a received reply
368
     */
369
    // XXX: this strongly resembles RelaxationHeuristic.relaxedExploration, why it cannot use one universal code
370
    //      (generally evaluation coming from different heuristics of from local heuristic should be treated equally)
371
    public void finishExplorationQueue(){
372
        /*
373
         * Check this! We need to add propositions which were skipped, but also update costs
374
         * of propositions which were added, but can now be achieved another way
375
         */
376 229

  
377
        int unsolvedGoals = 0;
378
        for(Proposition g : goalPropositions){
379
            if(g.cost == -1)++unsolvedGoals;
380
        }
381

  
382
        //continue with the exploration
383
        while(!explorationQueue.isEmpty()){
384
            Proposition p = explorationQueue.poll();
385

  
386
            if(p.cost < p.distance) continue;
387

  
388
            if(p.isGoal && --unsolvedGoals <= 0){ //cheaper but incomplete test
389
                boolean all_goals = true;
390
                for(Proposition g : goalPropositions){ //complete test
391
                    if(g.cost == -1){
392
                        all_goals = false;
393
                        ++unsolvedGoals;
394
                    }
395
                }
396
                if(all_goals)break;
397
            }
398

  
399
            //compute cost
400
            evaluator.evaluateOperators(p.preconditionOf, p.cost);
401

  
402
            //trigger operators
403
            for(UnaryOperator op : p.preconditionOf){
404
                --op.unsatisfied_preconditions;
405
                if(op.unsatisfied_preconditions <= 0){
406
                    //use the original enqueue - do not send requests
407
                    super.enqueueIfNecessary(op.effect,op.cost,op);
408
                }
409
            }
410

  
411

  
412
        }
413

  
414
//		LOGGER.info(domain.agent + "("+id+")" + "finish eq done, pending requests:"+requests.size());
415

  
416
        //if there are no waiting requests, finish the heuristic
417
        if(requests.size() == 0){
418
//    		LOGGER.info(domain.agent + "("+id+")" + "Computed H(" + (heuristicCounter --) + "): requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
419

  
420
            int totalCost = evaluator.getTotalCost(goalPropositions);
421

  
422
            HelpfulActions ha = totalCost < HeuristicInterface.LARGE_HEURISTIC ? evaluator.getHelpfulActions(currentState) : new HelpfulActions();
423
            currentCallback.heuristicComputed(new HeuristicResult(totalCost,ha));
424
        }else{
425
//    		LOGGER.warn(domain.agent + "("+id+") - waiting for "+requests.size()+"!");
426
        }
427
    }
230
   
428 231

  
429 232

  
430 233

  
src/main/java/cz/agents/dimaptools/heuristic/relaxed/RecursiveDistributedRelaxationRequestHeuristic.java
25 25

  
26 26
    private final Logger LOGGER = Logger.getLogger(RecursiveDistributedRelaxationRequestHeuristic.class);
27 27

  
28
    private final String id = "REQ";
28
    protected final String id = "REQ";
29 29
    private final DistributedHeuristicRequestProtocol requestProtocol;
30 30
    private DistributedHeuristicReplyProtocol replyProtocol = null;
31 31

  
32 32
    private final Communicator comm;
33
    private final String agentName;
34
    private final int agentID;
33
    protected final String agentName;
34
    protected final int agentID;
35 35

  
36 36
    /**
37 37
     * Sent requests waiting for the reply
38 38
     */
39
    private TIntObjectHashMap requests = new TIntObjectHashMap();
39
    protected TIntObjectHashMap requests = new TIntObjectHashMap();
40 40
    private HeuristicComputedCallback currentCallback;
41 41

  
42 42
    /**
......
46 46

  
47 47
//	private int heuristicCounter = 0;
48 48

  
49
    private final int maxRecursionDepth;
49
    protected final int maxRecursionDepth;
50 50

  
51 51
    /**
52 52
     * Remember actions already requested
53 53
     */
54
    private TIntHashSet requestedActions = new TIntHashSet();
54
    protected TIntHashSet requestedActions = new TIntHashSet();
55 55

  
56 56

  
57 57

  
......
197 197

  
198 198

  
199 199

  
200
    private void sendRequest(int action, int localCost, RelaxationHeuristicRequest req, UnaryOperator op, int recursionDepth) {
200
    protected void sendRequest(int action, int localCost, RelaxationHeuristicRequest req, UnaryOperator op, int recursionDepth) {
201 201
        if(LOGGER.isDebugEnabled())LOGGER.debug(domain.agent + "("+id+")" + " send request: " + problem.getAction(op.actionHash));
202 202

  
203 203
//		if(recursionDepth > 1){
src/main/java/cz/agents/dimaptools/util/SharedProblemInfoProvider.java
4 4
import java.util.Map;
5 5
import java.util.Set;
6 6

  
7
import org.apache.log4j.Logger;
8

  
7 9
import cz.agents.dimaptools.DIMAPWorldInterface;
8 10
import cz.agents.dimaptools.communication.message.SharedProblemInfoMessage;
9 11
import cz.agents.dimaptools.communication.protocol.DistributedProblemInfoSharingProtocol;
......
11 13

  
12 14
public class SharedProblemInfoProvider {
13 15
	
14
//	 private static final Logger LOGGER = Logger.getLogger(SharedProblemInfoProvider.class);
16
	 private static final Logger LOGGER = Logger.getLogger(SharedProblemInfoProvider.class);
15 17
	
16 18
	private DistributedProblemInfoSharingProtocol protocol;
17 19
	
18 20
	private Map<String,Integer> couplings = new HashMap<>();
19 21
	private int waitingFor;
22
	final DIMAPWorldInterface world;
20 23
	
21 24
	public SharedProblemInfoProvider(final DIMAPWorldInterface world, int totalAgents){
22 25
		
......
32 35
			}
33 36
		};
34 37
		
35
//		LOGGER.info(world.getAgentName() + " send info");
38
		this.world = world;
39
		
40
	}
41
	
42
	public void sendInfoAndWait(){
43
		LOGGER.info(world.getAgentName() + " send info and wait...");
36 44
		protocol.sendSharedProblemInfoMessage(new SharedProblemInfoMessage(computeMyCoupling(world.getProblem())));
37 45
		
38 46
		while(waitingFor > 0){
src/test/java/cz/agents/dimaptools/relaxed/TestRecursiveDistributedPersonalizedHeuristic.java
32 32
		RecursiveDistributedRelaxationPersonalizedRequestHeuristic req = new RecursiveDistributedRelaxationPersonalizedRequestHeuristic(world, new AddEvaluator(world.getProblem()),60);
33 33
		RecursiveDistributedRelaxationReplyHeuristic rep = new RecursiveDistributedRelaxationReplyHeuristic(world, new AddEvaluator(world.getProblem()),req.getRequestProtocol());
34 34
		req.setReplyProtocol(rep.getReplyProtocol());
35
		
36
		req.shareKnowledge();
35 37

  
36 38
		search.plan(new MapConfiguration("heuristic",req,"requestHeuristic",rep), searchCallback);
37 39
	}
src/test/java/cz/agents/dimaptools/util/TestSharedProblemInfoProvider.java
21 21
	@Override
22 22
	public void runSearch(DIMAPWorldInterface world) {
23 23
		SharedProblemInfoProvider provider = new SharedProblemInfoProvider(world,world.getNumberOfAgents());
24
		provider.sendInfoAndWait();
24 25
		
25 26
		System.out.println(world.getAgentName() + ":");
26 27
		for(String agent : provider.getKnownAgents()){

Also available in: Unified diff