Revision 101:b072c5028372

View differences:

src/main/java/cz/agents/alite/communication/QueuedCommunicator.java
6 6
public class QueuedCommunicator extends DefaultCommunicator implements PerformerCommunicator {
7 7

  
8 8
	private final List<MessageHandler> userMessageHandlers = new CopyOnWriteArrayList<MessageHandler>();
9
	private final QueuedMessageHandler qMessageHandler;
9
	private final List<QueuedMessageHandler> qMessageHandlers = new CopyOnWriteArrayList<QueuedMessageHandler>();
10 10

  
11 11
	public QueuedCommunicator(String address) {
12 12
		super(address);
13 13

  
14
		//TODO: remove the class from queued msghandler
15
		qMessageHandler = new QueuedMessageHandler(Object.class);
16
		super.addMessageHandler(qMessageHandler);
14
	}
15
	
16
	public void handleMessageClass(Class<?> cls){
17
		QueuedMessageHandler qmh = new QueuedMessageHandler(cls);
18
		super.addMessageHandler(qmh);
19
		qMessageHandlers.add(qmh);
17 20
	}
18 21
	
19 22
	@Override
......
29 32

  
30 33
	@Override
31 34
	public void performReceive() {
32
		if(qMessageHandler.hasMessage()){
33
			Message msg = qMessageHandler.pullMessage();
34
			for (MessageHandler messageHandler : userMessageHandlers) {
35
	            messageHandler.notify(msg);
36
	        }
35
		for(QueuedMessageHandler qmh : qMessageHandlers){
36
			if(qmh.hasMessage()){
37
				Message msg = qmh.pullMessage();
38
				for (MessageHandler messageHandler : userMessageHandlers) {
39
		            messageHandler.notify(msg);
40
		        }
41
			}
37 42
		}
38 43
	}
39 44

  
src/main/java/cz/agents/dimaptools/protocol/DefaultEncoder.java
1 1
package cz.agents.dimaptools.protocol;
2 2

  
3 3
import cz.agents.alite.communication.content.Content;
4
import cz.agents.dimaptools.message.HeuristicReplyWithPublicActionsMessage;
5
import cz.agents.dimaptools.message.HeuristicRequestMessage;
4 6
import cz.agents.dimaptools.message.PlanningFinishedMessage;
5 7
import cz.agents.dimaptools.message.ReconstructPlanMessage;
6 8
import cz.agents.dimaptools.message.StateMessage;
......
21 23
	public Content encodePlanningFinishedMessage(PlanningFinishedMessage msg) {
22 24
		return new Content(msg);
23 25
	}
26
	
27
	@Override
28
	public Content encodeHeuristicRequestMessage(HeuristicRequestMessage req) {
29
		return new Content(req);
30
	}
31

  
32
	@Override
33
	public Content encodeHeuristicReplyWithPublicActionsMessage(
34
			HeuristicReplyWithPublicActionsMessage re) {
35
		return new Content(re);
36
	}
37
	
38
	
39
	
40
	
24 41

  
25 42
	@Override
26 43
	public Object decode(Content content) {
27 44
		return content.getData();
28 45
	}
29 46

  
47
	
48

  
30 49
}
src/main/java/cz/agents/dimaptools/protocol/DistributedHeuristicProtocol.java
1
package cz.agents.dimaptools.protocol;
2

  
3
import cz.agents.alite.communication.Communicator;
4
import cz.agents.alite.communication.Message;
5
import cz.agents.alite.communication.MessageHandler;
6
import cz.agents.alite.communication.QueuedCommunicator;
7
import cz.agents.alite.communication.channel.CommunicationChannelBroadcast;
8
import cz.agents.alite.communication.content.Content;
9
import cz.agents.alite.communication.protocol.DefaultProtocol;
10
import cz.agents.dimaptools.message.HeuristicReplyWithPublicActionsMessage;
11
import cz.agents.dimaptools.message.HeuristicRequestMessage;
12
import cz.agents.dimaptools.message.PlanningFinishedMessage;
13
import cz.agents.dimaptools.message.ReconstructPlanMessage;
14
import cz.agents.dimaptools.message.StateMessage;
15

  
16
public abstract class DistributedHeuristicProtocol extends DefaultProtocol {
17
	
18
	private final EncoderInterface encoder;
19

  
20
	public DistributedHeuristicProtocol(Communicator communicator, String name, final EncoderInterface encoder, final boolean receiveRequests) {
21
		super(communicator, name);
22

  
23
		this.encoder = encoder;
24
		
25
		communicator.addMessageHandler(new MessageHandler() {
26
			
27
			@Override
28
			public void notify(Message message) {
29
				Object data = encoder.decode(message.getContent());
30
				
31
				if(receiveRequests && data instanceof HeuristicRequestMessage)receiveHeuristicRequestMessage((HeuristicRequestMessage)data,message.getSender());
32
				if(!receiveRequests && data instanceof HeuristicReplyWithPublicActionsMessage)receiveHeuristicReplyWithPublicActionsMessage((HeuristicReplyWithPublicActionsMessage)data,message.getSender());
33
				
34
				
35
			}
36
		});
37
	}
38
	
39
	
40
	public void sendHeuristicRequestMessage(HeuristicRequestMessage req, String receiver){
41
		Message message = communicator.createMessage(encoder.encodeHeuristicRequestMessage(req));
42
        message.addReceiver(receiver);
43
        communicator.sendMessage(message);
44
	}
45
	
46
	public void sendHeuristicReplyWithPublicActionsMessage(HeuristicReplyWithPublicActionsMessage re, String receiver) {
47
		Message message = communicator.createMessage(encoder.encodeHeuristicReplyWithPublicActionsMessage(re));
48
        message.addReceiver(receiver);
49
        communicator.sendMessage(message);
50
	}
51
	
52
	
53
	
54
	
55
	public abstract void receiveHeuristicRequestMessage(HeuristicRequestMessage req, String sender);
56
	
57
	public abstract void receiveHeuristicReplyWithPublicActionsMessage(HeuristicReplyWithPublicActionsMessage re, String sender);
58

  
59

  
60
	public String getAddress() {
61
		return communicator.getAddress();
62
	}
63

  
64

  
65
	public static void registerClasses(QueuedCommunicator comm){
66
		comm.handleMessageClass(HeuristicRequestMessage.class);
67
		comm.handleMessageClass(HeuristicReplyWithPublicActionsMessage.class);
68
	}
69
	
70

  
71
}
src/main/java/cz/agents/dimaptools/protocol/DistributedSearchProtocol.java
3 3
import cz.agents.alite.communication.Communicator;
4 4
import cz.agents.alite.communication.Message;
5 5
import cz.agents.alite.communication.MessageHandler;
6
import cz.agents.alite.communication.QueuedCommunicator;
6 7
import cz.agents.alite.communication.channel.CommunicationChannelBroadcast;
7 8
import cz.agents.alite.communication.protocol.DefaultProtocol;
8 9
import cz.agents.dimaptools.message.PlanningFinishedMessage;
......
60 61
	
61 62
	public abstract void receivePlanningFinishedMessage(PlanningFinishedMessage msg);
62 63

  
64
	
65
	public static void registerClasses(QueuedCommunicator comm){
66
		comm.handleMessageClass(StateMessage.class);
67
		comm.handleMessageClass(ReconstructPlanMessage.class);
68
		comm.handleMessageClass(PlanningFinishedMessage.class);
69
	}
63 70
}
src/main/java/cz/agents/dimaptools/protocol/EncoderInterface.java
1 1
package cz.agents.dimaptools.protocol;
2 2

  
3 3
import cz.agents.alite.communication.content.Content;
4
import cz.agents.dimaptools.message.HeuristicReplyWithPublicActionsMessage;
5
import cz.agents.dimaptools.message.HeuristicRequestMessage;
4 6
import cz.agents.dimaptools.message.PlanningFinishedMessage;
5 7
import cz.agents.dimaptools.message.ReconstructPlanMessage;
6 8
import cz.agents.dimaptools.message.StateMessage;
7 9

  
8 10
public interface EncoderInterface {
9 11
	
12
	//search
10 13
	public Content encodeStateMessage(StateMessage msg);
11 14
	
12 15
	public Content encodeReconstructPlanMessage(ReconstructPlanMessage msg);
13 16
	
14 17
	public Content encodePlanningFinishedMessage(PlanningFinishedMessage msg);
15 18
	
19
	//heuristics
20
	public Content encodeHeuristicRequestMessage(HeuristicRequestMessage req);
21

  
22
	public Content encodeHeuristicReplyWithPublicActionsMessage(HeuristicReplyWithPublicActionsMessage re);
23
	
16 24
	
17 25
	
18 26

  
19 27
	public Object decode(Content content);
20 28

  
29
	
30

  
21 31
}
src/main/java/cz/agents/dimaptools/relaxed/DistributedAdditiveHeuristic.java
5 5
import org.apache.log4j.Logger;
6 6

  
7 7
import cz.agents.alite.communication.Communicator;
8
import cz.agents.dimaptools.DIMAPWorldInterface;
8 9
import cz.agents.dimaptools.domain.Problem;
9 10

  
10 11
public class DistributedAdditiveHeuristic extends DistributedRelaxationHeuristic {
11 12

  
12 13
	private final Logger LOGGER;
13 14

  
14
	public DistributedAdditiveHeuristic(Problem problem, Communicator comm,boolean receiveRequests) {
15
		super(comm, problem,receiveRequests);
15
	public DistributedAdditiveHeuristic(DIMAPWorldInterface world,boolean receiveRequests) {
16
		super(world,receiveRequests);
16 17

  
17 18
		LOGGER = Logger.getLogger(problem.agent + "." + RelaxationHeuristic.class);
18 19
	}
src/main/java/cz/agents/dimaptools/relaxed/DistributedMaxHeuristic.java
3 3
import java.util.List;
4 4

  
5 5
import cz.agents.alite.communication.Communicator;
6
import cz.agents.dimaptools.DIMAPWorldInterface;
6 7
import cz.agents.dimaptools.domain.Problem;
7 8

  
8 9
public class DistributedMaxHeuristic extends DistributedRelaxationHeuristic {
9 10

  
10 11
//	private final Logger LOGGER;
11 12

  
12
	public DistributedMaxHeuristic(Problem problem, Communicator comm,boolean receiveRequests) {
13
		super(comm, problem,receiveRequests);
13
	public DistributedMaxHeuristic(DIMAPWorldInterface world,boolean receiveRequests) {
14
		super(world,receiveRequests);
14 15

  
15 16
//		LOGGER = Logger.getLogger(problem.agent + "." + RelaxationHeuristic.class);
16 17
	}
src/main/java/cz/agents/dimaptools/relaxed/DistributedRelaxationHeuristic.java
6 6

  
7 7
import org.apache.log4j.Logger;
8 8

  
9
import cz.agents.alite.communication.Communicator;
10
import cz.agents.alite.communication.Message;
11
import cz.agents.alite.communication.QueuedMessageHandler;
12
import cz.agents.alite.communication.content.Content;
13
import cz.agents.dimaptools.domain.Problem;
9
import cz.agents.dimaptools.DIMAPWorldInterface;
14 10
import cz.agents.dimaptools.domain.State;
15 11
import cz.agents.dimaptools.experiment.DataAccumulator;
16 12
import cz.agents.dimaptools.heuristic.HeuristicResult;
17 13
import cz.agents.dimaptools.message.HeuristicReplyWithPublicActionsMessage;
18 14
import cz.agents.dimaptools.message.HeuristicRequestMessage;
15
import cz.agents.dimaptools.protocol.DistributedHeuristicProtocol;
19 16

  
20 17
public abstract class DistributedRelaxationHeuristic extends RelaxationHeuristic {
21 18

  
22 19
	private final Logger LOGGER;
23 20

  
24
	private final Communicator comm;
25
	private QueuedMessageHandler requestHandler = new QueuedMessageHandler(HeuristicRequestMessage.class);
26
    private QueuedMessageHandler replyHandler = new QueuedMessageHandler(HeuristicReplyWithPublicActionsMessage.class);
27

  
21
	private final DistributedHeuristicProtocol protocol;
22
	
28 23
    private TIntObjectHashMap requests = new TIntObjectHashMap();
29 24
    private HeuristicComputedCallback currentCallback;
30 25

  
......
37 32

  
38 33

  
39 34

  
40
	public DistributedRelaxationHeuristic(Communicator comm, Problem problem,boolean receiveRequests) {
41
		super(problem,true);
35
	public DistributedRelaxationHeuristic(DIMAPWorldInterface world, boolean receiveRequests) {
36
		super(world.getProblem(),true);
42 37

  
43 38
		LOGGER = Logger.getLogger(problem.agent + "." + RelaxationHeuristic.class);
44 39

  
45
		this.comm = comm;
40
		protocol = new DistributedHeuristicProtocol(
41
				world.getCommunicator(),
42
				world.getAgentName(),
43
				world.getEncoder(),
44
				receiveRequests){
45

  
46
			@Override
47
			public void receiveHeuristicRequestMessage(HeuristicRequestMessage req, String sender) {
48
				if(LOGGER.isDebugEnabled())LOGGER.debug(protocol.getAddress() + "("+id+")" + " handle request from " + sender + ": " + req.humanize(problem.getDomain()));
49
            	processRequest(req,sender);
50
			}
51

  
52
			@Override
53
			public void receiveHeuristicReplyWithPublicActionsMessage(HeuristicReplyWithPublicActionsMessage re, String sender) {
54
            	if(LOGGER.isDebugEnabled())LOGGER.debug(protocol.getAddress() + "("+id+")" + " handle reply from " + sender + ": " + re);
55
            	processReply(re);
56
			}
57
			
58
		};
46 59

  
47 60
		if(receiveRequests){
48 61
			id = "receive";
49
			comm.addMessageHandler(requestHandler);
62
//			comm.addMessageHandler(requestHandler);
50 63
		}else{
51 64
			id = "send";
52
			comm.addMessageHandler(replyHandler);
65
//			comm.addMessageHandler(replyHandler);
53 66
		}
54 67

  
55 68
		sendRequests = !receiveRequests;
......
77 90
		relaxedExploration();
78 91

  
79 92
		if(requests.size() == 0){
80
			LOGGER.info(domain.agent + "("+id+")" + "Computed H(" + (heuristicCounter --) + "): requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
93
			LOGGER.info(domain.agent + "("+id+")" + "Computed H(" + (heuristicCounter --) + "): , requests.size():"+requests.size());
81 94
			callback.heuristicComputed(new HeuristicResult(getTotalCost()));
82 95
		}
83 96
	}
......
121 134

  
122 135
		HeuristicRequestMessage req = new HeuristicRequestMessage(reqHash, currentState.getValues(), reqOps);
123 136

  
124
		if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " send request " + req.humanize(problem.getDomain()));
137
		if(LOGGER.isDebugEnabled())LOGGER.debug(protocol.getAddress() + "("+id+")" + " send request " + req.humanize(problem.getDomain()));
125 138

  
126 139
		DataAccumulator.getAccumulator().heuristicRequestMessages ++;
127 140
		DataAccumulator.getAccumulator().totalBytes += req.getBytes();
128 141

  
129
		Message message = comm.createMessage(new Content(req));
130
        message.addReceiver(agent);
131
        comm.sendMessage(message);
142
		protocol.sendHeuristicRequestMessage(req, agent);
132 143
	}
133 144

  
134 145

  
......
137 148

  
138 149
	@Override
139 150
	public void processMessages() {
140
		if(!initialized){
141
			LOGGER.warn("NOT INITIALIZED YET!");
142
		}
143

  
144

  
145

  
146
//		LOGGER.info(domain.agent + "("+id+")" + " requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
147
		boolean log = requestHandler.queueSize() > 0 || replyHandler.queueSize() > 0;
148

  
149
		if(log)if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " process messages " +  requestHandler.queueSize() + "/" + replyHandler.queueSize() + " ...");
150

  
151

  
152

  
153
		while(requestHandler.hasMessage()){
154
            final Message message = requestHandler.pullMessage();
155
            if (!message.getSender().equals(comm.getAddress())) {
156

  
157
            	HeuristicRequestMessage req = (HeuristicRequestMessage)message.getContent().getData();
158

  
159
            	if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " handle request from " + message.getSender() + ": " + req.humanize(problem.getDomain()));
160

  
161
            	processRequest(req,message.getSender());
162

  
163
            }
164
		}
165

  
166
		while(replyHandler.hasMessage()){
167
            final Message message = replyHandler.pullMessage();
168
            if (!message.getSender().equals(comm.getAddress())) {
169

  
170
            	HeuristicReplyWithPublicActionsMessage re = (HeuristicReplyWithPublicActionsMessage)message.getContent().getData();
171

  
172
            	if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " handle reply from " + message.getSender() + ": " + re);
173

  
174
            	processReply(re);
175
            }
176
		}
177

  
178

  
179

  
180
//		LOGGER.info(domain.agent + "("+id+")" + "DONE: requestHandler.queueSize():"+requestHandler.queueSize()+", replyHandler.queueSize():"+replyHandler.queueSize()+", requests.size():"+requests.size());
181

  
182
		if(log)if(LOGGER.isDebugEnabled())LOGGER.debug(comm.getAddress() + "("+id+")" + " process messages done");
183

  
184 151
		while(requests.size() == 0 && localRequests.size() > 0){
185 152
			LocalHeuristicRequest lr = localRequests.pollLast();
186 153
//			LOGGER.info(comm.getAddress() + "("+id+") localRequests: " + localRequests.size());
......
227 194
		DataAccumulator.getAccumulator().heuristicReplyMessages ++;
228 195
		DataAccumulator.getAccumulator().totalBytes += re.getBytes();
229 196

  
230
		Message message = comm.createMessage(new Content(re));
231
        message.addReceiver(agent);
232
        comm.sendMessage(message);
197
		protocol.sendHeuristicReplyWithPublicActionsMessage(re,agent);
198
		
233 199
	}
234 200

  
235 201
	public void processReply(HeuristicReplyWithPublicActionsMessage re){
src/main/java/cz/agents/dimaptools/search/DistributedAStar.java
232 232

  
233 233
        }else{
234 234
        	
235
        	LOGGER.info(comm.getAddress() + " send reconstruct msg " + state.hashCode() + " to " + lastState.getParentActionOwner());
235
//        	LOGGER.info(comm.getAddress() + " send reconstruct msg " + state.hashCode() + " to " + lastState.getParentActionOwner());
236 236
        	protocol.sendReconstructPlanMessage(new ReconstructPlanMessage(plan,lastState.hashCode()), lastState.getParentActionOwner());
237 237
        }
238 238
	}
src/test/java/cz/agents/dimaptools/relaxed/TestDistributedAdditiveHeuristic.java
2 2

  
3 3
import org.junit.Test;
4 4

  
5
import cz.agents.alite.communication.Communicator;
6 5
import cz.agents.alite.configurator.MapConfiguration;
7 6
import cz.agents.dimaptools.DIMAPWorldInterface;
8
import cz.agents.dimaptools.domain.Problem;
9 7
import cz.agents.dimaptools.heuristic.HeuristicInterface;
10 8
import cz.agents.dimaptools.search.AbstractDistributedAStarTest;
11 9
import cz.agents.dimaptools.search.DistributedAStar;
......
18 16
//		testProblem("truck-crane-factory-a3");
19 17
//		testProblem("logistics-a2");
20 18
//		testProblem("logistics-a4");
21
//		testProblem("deconfliction-a4");
19
		testProblem("deconfliction-a4");
22 20
//		testProblem("rovers-a4");
23 21
//		testProblem("sokoban-a1");
24 22
//		testProblem("sokoban-a2");
......
29 27
		DistributedAStar search = new DistributedAStar(world);
30 28
//		AStar search = new AStar(problem);
31 29

  
32
		HeuristicInterface heuristic = new DistributedAdditiveHeuristic(world.getProblem(),world.getCommunicator(),false);
33
		HeuristicInterface reqHeuristic = new DistributedAdditiveHeuristic(world.getProblem(),world.getCommunicator(),true);
30
		HeuristicInterface heuristic = new DistributedAdditiveHeuristic(world,false);
31
		HeuristicInterface reqHeuristic = new DistributedAdditiveHeuristic(world,true);
34 32

  
35 33
		search.plan(new MapConfiguration("heuristic",heuristic,"requestHeuristic",reqHeuristic), searchCallback);
36 34
	}
src/test/java/cz/agents/dimaptools/search/AbstractDistributedAStarTest.java
28 28
import cz.agents.dimaptools.preprocess.SASParser;
29 29
import cz.agents.dimaptools.preprocess.SASPreprocessor;
30 30
import cz.agents.dimaptools.protocol.DefaultEncoder;
31
import cz.agents.dimaptools.protocol.DistributedHeuristicProtocol;
32
import cz.agents.dimaptools.protocol.DistributedSearchProtocol;
31 33

  
32 34
public abstract class AbstractDistributedAStarTest {
33 35

  
......
150 152
	public PerformerCommunicator initCommunicator(String address){
151 153
		QueuedCommunicator communicator = new QueuedCommunicator(address);
152 154
        try {
155
//        	communicator.handleMessageClass(Object.class); //TODO: works but not very well
156
        	
157
        	DistributedSearchProtocol.registerClasses(communicator);
158
        	DistributedHeuristicProtocol.registerClasses(communicator);
159
        	
153 160
            communicator.addChannel(new DirectCommunicationChannelAsync(communicator, receiverTable, executorService));
154 161
        } catch (CommunicationChannelException e) {
155 162
            LOGGER.fatal("Communication channel creation error!", e);
src/test/java/cz/agents/dimaptools/search/TestDistributedAStar.java
4 4

  
5 5
import java.util.List;
6 6

  
7
import org.apache.log4j.Level;
8 7
import org.apache.log4j.Logger;
9 8
import org.junit.Test;
10 9

  
......
20 19
	public void test() {
21 20
//		LOGGER.setLevel(Level.INFO);
22 21
//		testProblem("rovers-a4");
23
//		testProblem("sokoban-a2");
22
		testProblem("sokoban-a2");
24 23
//		testProblem("logistics-a2");
25 24
	}
26 25

  

Also available in: Unified diff