Conseil pour l'utilisation d'une LinkedBlockingQueue
Bonjour!
Tout d'abord je vous explique le contexte de mon projet.
J'ai développé un programme qui collecte des données de differentes, et doit les transmettre à broker (activemq pour ne pas le nommer). Afin d'éviter les goulots d'etranglements, j'ai mis en place entre ces deux composants, une fifo (LinkedBlockingQueue).
Pour l'ecrite et la lecture de message dans cette fifo, j'utilise des threads, et un executor pour regir tout ce beau monde.
Ma grande question est tout bếte : n'aurais je pas un peu abusé des threads, suis je vraiment obligé d'utiliser autant de processus si c'est juste pour qu'ils attendent (utilisation d'un while(true) {sleep()} )
Merci bien !
Le code de mon gestionnaire de fifo :
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class FifoManager {
private Executor executor = null;
private int nbthread = 10;
private LinkedBlockingQueue<String> listeMessage = null;
private int nbInitialFifoSize = 10000;
private List<HandlerWriter> handlerWriterList;
private List<HandlerReader> handlerReaderList;
public FifoManager() {
handlerWriterList = new ArrayList<HandlerWriter>();
handlerReaderList = new ArrayList<HandlerReader>();
executor = Executors.newFixedThreadPool(nbthread);
listeMessage = new LinkedBlockingQueue<String>((int) nbInitialFifoSize);
handlerWriterList.add(new HandlerWriter(listeMessage,"writer"));
Thread threadWriter = new Thread(handlerWriterList.get(0));
for(int indexThread = 0; indexThread < nbthread; indexThread ++) {
HandlerReader handlerReader = new HandlerReader(listeMessage, "reader-"+indexThread);
handlerReaderList.add(handlerReader);
executor.execute(handlerReader);
}
threadWriter.start();
}
public void addReaderListener(FifoListenerReader fifoListener) {
for(HandlerReader handlerReader : handlerReaderList) {
handlerReader.addListener(fifoListener);
}
}
public void pushMessage(String msg){
for(HandlerWriter handlerWriter : handlerWriterList) {
handlerWriter.pushMessage(msg);
}
}
} |
Les threads d'ecriture :
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class HandlerWriter implements Runnable {
private LinkedBlockingQueue<String> listeMessage;
private String name;
public HandlerWriter(LinkedBlockingQueue<String> listeMsg, String name) {
listeMessage = listeMsg ;
this.name = name;
}
@Override
public void run() {
while(true){
try{
Thread.sleep(100);
}catch(InterrruptedException e){}
}
}
public void pushMessage(String message) {
try {
listeMessage.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} |
Les threads de lecture :
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class HandlerReader implements Runnable {
private LinkedBlockingQueue<String> listeMessage;
private List<FifoListenerReader> listListener;
private String name;
public HandlerReader( LinkedBlockingQueue<String> listeMsg, String name) {
listeMessage = listeMsg;
listListener = new ArrayList<FifoListenerReader>();
this.name = name;
}
@Override
public void run() {
while(true) {
try {
String msg= listeMessage.take();
notifyListeners(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void addListener(FifoListenerReader fifoListener) {
if(fifoListener != null) {
listListener.add(fifoListener);
}
}
public void notifyListeners(String message) {
for(FifoListenerReader fifoListener : listListener) {
fifoListener.messageAvalaible(name +":"+message);
}
}
} |