java多线程处理执行solr创建索引示例
java多线程处理执行solr创建索引示例
发布时间:2016-12-28 来源:查字典编辑
摘要:复制代码代码如下:publicclassSolrIndexerimplementsIndexer,Searcher,DisposableBe...

复制代码 代码如下:

public class SolrIndexer implements Indexer, Searcher, DisposableBean {

//~ Static fields/initializers =============================================

static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);

private static final long SHUTDOWN_TIMEOUT = 5 * 60 * 1000L; // long enough

private static final int INPUT_QUEUE_LENGTH = 16384;

//~ Instance fields ========================================================

private CommonsHttpSolrServer server;

private BlockingQueue<Operation> inputQueue;

private Thread updateThread;

volatile boolean running = true;

volatile boolean shuttingDown = false;

//~ Constructors ===========================================================

public SolrIndexer(String url) throws MalformedURLException {

server = new CommonsHttpSolrServer(url);

inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);

updateThread = new Thread(new UpdateTask());

updateThread.setName("SolrIndexer");

updateThread.start();

}

//~ Methods ================================================================

public void setSoTimeout(int timeout) {

server.setSoTimeout(timeout);

}

public void setConnectionTimeout(int timeout) {

server.setConnectionTimeout(timeout);

}

public void setAllowCompression(boolean allowCompression) {

server.setAllowCompression(allowCompression);

}

public void addIndex(Indexable indexable) throws IndexingException {

if (shuttingDown) {

throw new IllegalStateException("SolrIndexer is shutting down");

}

inputQueue.offer(new Operation(indexable, OperationType.UPDATE));

}

public void delIndex(Indexable indexable) throws IndexingException {

if (shuttingDown) {

throw new IllegalStateException("SolrIndexer is shutting down");

}

inputQueue.offer(new Operation(indexable, OperationType.DELETE));

}

private void updateIndices(String type, List<Indexable> indices) throws IndexingException {

if (indices == null || indices.size() == 0) {

return;

}

logger.debug("Updating {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");

req.setAction(UpdateRequest.ACTION.COMMIT, false, false);

for (Indexable idx : indices) {

Doc doc = idx.getDoc();

SolrInputDocument solrDoc = new SolrInputDocument();

solrDoc.setDocumentBoost(doc.getDocumentBoost());

for (Iterator<Field> i = doc.iterator(); i.hasNext();) {

Field field = i.next();

solrDoc.addField(field.getName(), field.getValue(), field.getBoost());

}

req.add(solrDoc);

}

try {

req.process(server);

} catch (SolrServerException e) {

logger.error("SolrServerException occurred", e);

throw new IndexingException(e);

} catch (IOException e) {

logger.error("IOException occurred", e);

throw new IndexingException(e);

}

}

private void delIndices(String type, List<Indexable> indices) throws IndexingException {

if (indices == null || indices.size() == 0) {

return;

}

logger.debug("Deleting {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");

req.setAction(UpdateRequest.ACTION.COMMIT, false, false);

for (Indexable indexable : indices) {

req.deleteById(indexable.getDocId());

}

try {

req.process(server);

} catch (SolrServerException e) {

logger.error("SolrServerException occurred", e);

throw new IndexingException(e);

} catch (IOException e) {

logger.error("IOException occurred", e);

throw new IndexingException(e);

}

}

public QueryResult search(Query query) throws IndexingException {

SolrQuery sq = new SolrQuery();

sq.setQuery(query.getQuery());

if (query.getFilter() != null) {

sq.addFilterQuery(query.getFilter());

}

if (query.getOrderField() != null) {

sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);

}

sq.setStart(query.getOffset());

sq.setRows(query.getLimit());

QueryRequest req = new QueryRequest(sq);

req.setPath("/" + query.getType() + "/select");

try {

QueryResponse rsp = req.process(server);

SolrDocumentList docs = rsp.getResults();

QueryResult result = new QueryResult();

result.setOffset(docs.getStart());

result.setTotal(docs.getNumFound());

result.setSize(sq.getRows());

List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());

for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {

SolrDocument solrDocument = i.next();

Doc doc = new Doc();

for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {

Map.Entry<String, Object> field = iter.next();

doc.addField(field.getKey(), field.getValue());

}

resultDocs.add(doc);

}

result.setDocs(resultDocs);

return result;

} catch (SolrServerException e) {

logger.error("SolrServerException occurred", e);

throw new IndexingException(e);

}

}

public void destroy() throws Exception {

shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);

}

public boolean shutdown(long timeout, TimeUnit unit) {

if (shuttingDown) {

logger.info("Suppressing duplicate attempt to shut down");

return false;

}

shuttingDown = true;

String baseName = updateThread.getName();

updateThread.setName(baseName + " - SHUTTING DOWN");

boolean rv = false;

try {

// Conditionally wait

if (timeout > 0) {

updateThread.setName(baseName + " - SHUTTING DOWN (waiting)");

rv = waitForQueue(timeout, unit);

}

} finally {

// But always begin the shutdown sequence

running = false;

updateThread.setName(baseName + " - SHUTTING DOWN (informed client)");

}

return rv;

}

/**

* @param timeout

* @param unit

* @return

*/

private boolean waitForQueue(long timeout, TimeUnit unit) {

CountDownLatch latch = new CountDownLatch(1);

inputQueue.add(new StopOperation(latch));

try {

return latch.await(timeout, unit);

} catch (InterruptedException e) {

throw new RuntimeException("Interrupted waiting for queues", e);

}

}

class UpdateTask implements Runnable {

public void run() {

while (running) {

try {

syncIndices();

} catch (Throwable e) {

if (shuttingDown) {

logger.warn("Exception occurred during shutdown", e);

} else {

logger.error("Problem handling solr indexing updating", e);

}

}

}

logger.info("Shut down SolrIndexer");

}

}

void syncIndices() throws InterruptedException {

Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);

if (op == null) {

return;

}

if (op instanceof StopOperation) {

((StopOperation) op).stop();

return;

}

// wait 1 second

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);

ops.add(op);

inputQueue.drainTo(ops);

Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);

Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);

for (Operation o : ops) {

if (o instanceof StopOperation) {

((StopOperation) o).stop();

} else {

Indexable indexable = o.indexable;

if (o.type == OperationType.DELETE) {

List<Indexable> docs = deleteMap.get(indexable.getType());

if (docs == null) {

docs = new LinkedList<Indexable>();

deleteMap.put(indexable.getType(), docs);

}

docs.add(indexable);

} else {

List<Indexable> docs = updateMap.get(indexable.getType());

if (docs == null) {

docs = new LinkedList<Indexable>();

updateMap.put(indexable.getType(), docs);

}

docs.add(indexable);

}

}

}

for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {

Map.Entry<String, List<Indexable>> entry = i.next();

delIndices(entry.getKey(), entry.getValue());

}

for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {

Map.Entry<String, List<Indexable>> entry = i.next();

updateIndices(entry.getKey(), entry.getValue());

}

}

enum OperationType { DELETE, UPDATE, SHUTDOWN }

static class Operation {

OperationType type;

Indexable indexable;

Operation() {}

Operation(Indexable indexable, OperationType type) {

this.indexable = indexable;

this.type = type;

}

}

static class StopOperation extends Operation {

CountDownLatch latch;

StopOperation(CountDownLatch latch) {

this.latch = latch;

this.type = OperationType.SHUTDOWN;

}

public void stop() {

latch.countDown();

}

}

//~ Accessors ===============

}

推荐文章
猜你喜欢
附近的人在看
推荐阅读
拓展阅读
相关阅读
网友关注
最新Java学习
热门Java学习
编程开发子分类