DBMNG数据库管理与应用

所有存在都是独创。
当前位置:首页 > 经验分享 > Java开发

【队列 高并发】java web瞬间高并发的解决方法

1、任何的高并发,请求总是会有一个顺序的

2、java的队列的数据结构是先进先出的取值顺序

3、BlockingQueue类(线程安全)(使用方法可以百度

一般使用LinkedBlockingQueue

利用以上几点,我们可以把高并发时候的请求放入一个队列,队列的大小可以自己定义,比如队列容量为1000个数据,那么可以利用过滤器或者拦截器把当前的请求放入队列,如果队列的容量满了,其余的请求可以丢掉或者作出相应回复

具体实施:

利用生产者、消费者模型:

将队列的请求一一处理完。

 

上代码:

/**
* @author fuguangli
* @description 前沿消费者类
* @Create date:    2017/3/7
* @using   EXAMPLE
*/
public class Customer implements Runnable{


   /**
    *         抛出异常    特殊值        阻塞         超时
    插入    add(e)    offer(e)    put(e)    offer(e, time, unit)
    移除    remove()    poll()    take()    poll(time, unit)
    检查    element()    peek()    不可用    不可用
    */
   private BlockingQueue blockingQueue;
   private AtomicInteger count = new AtomicInteger();
   public Customer(BlockingQueue blockingQueue) {
       this.blockingQueue = blockingQueue;
   }

   /**
    * When an object implementing interface <code>Runnable</code> is used
    * to create a thread, starting the thread causes the object's
    * <code>run</code> method to be called in that separately executing
    * thread.
    * <p/>
    * The general contract of the method <code>run</code> is that it may
    * take any action whatsoever.
    *
    * @see Thread#run()
    */
   @Override
   public void run() {
       System.out.println("消费者线程启动...");
       LockFlag.setCustomerRunningFlag(true);
       try {
           while (LockFlag.getProducerRunningFlag()){
               System.out.println(Thread.currentThread().getId()+"I'm Customer.Queue current size="+blockingQueue.size());
               String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS);
               if(data!=null){
                   System.out.println(Thread.currentThread().getId()+"*************正在消费数据 data="+data);
               }else{
                   //表示超过取值时间,视为生产者不再生产数据
                   System.out.println(Thread.currentThread().getId()+"队列为空无数据,请检查生产者是否阻塞");
               }
               Thread.sleep(50);
           }
           System.err.println("消费者程序执行完毕");
       } catch (InterruptedException e) {
           e.printStackTrace();
           System.err.println("消费者程序退出");
           LockFlag.setCustomerRunningFlag(false);//异常退出线程
           Thread.currentThread().interrupt();
       }
   }
}


package com.qysxy.framework.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author fuguangli
* @description 队列生产者类
* @Create date:    2017/3/7
* @using       EXAMPLE
*/
public class Producer implements Runnable{


   /**
    *         抛出异常    特殊值        阻塞         超时
    插入    add(e)    offer(e)    put(e)    offer(e, time, unit)
    移除    remove()    poll()    take()    poll(time, unit)
    检查    element()    peek()    不可用    不可用
    */
   private BlockingQueue blockingQueue;
   private AtomicInteger count = new AtomicInteger();
   public Producer(BlockingQueue blockingQueue) {
       this.blockingQueue = blockingQueue;
   }

   /**
    * When an object implementing interface <code>Runnable</code> is used
    * to create a thread, starting the thread causes the object's
    * <code>run</code> method to be called in that separately executing
    * thread.
    * <p/>
    * The general contract of the method <code>run</code> is that it may
    * take any action whatsoever.
    *
    * @see Thread#run()
    */
   @Override
   public void run() {
       System.out.println("生产者线程启动...");
       LockFlag.setProducerRunningFlag(true);
       try {
           while (LockFlag.getProducerRunningFlag()){
               String data = "data:"+count.incrementAndGet();
               if(blockingQueue.offer(data,10, TimeUnit.SECONDS)){
                   //返回true表示生产数据正确
                   System.out.println("^^^^^^^^^^^^^^正在生产数据 data="+data);
               }else {
                   //表示阻塞时间内还没有生产者生产数据
                   System.out.println("生产者异常,无法生产数据");
               }
               Thread.sleep(50);

           }
       } catch (InterruptedException e) {
           e.printStackTrace();
           System.err.println("生产者程序退出");
           LockFlag.setProducerRunningFlag(false);//异常退出线程
           Thread.currentThread().interrupt();
       }
   }
}


package com.qysxy.framework.queue;

/**
* @author fuguangli
* @description 前沿生产者消费者模型的锁类
* @Create date:    2017/3/7
*/
public class LockFlag {
   /**
    * 生产者互斥锁
    */
   private static Boolean producerRunningFlag = false;
   /**
    * 消费者互斥锁
    */
   private static Boolean customerRunningFlag = false;

   public static Boolean getProducerRunningFlag() {
       return producerRunningFlag;
   }

   public static void setProducerRunningFlag(Boolean producerRunningFlag) {
       LockFlag.producerRunningFlag = producerRunningFlag;
   }

   public static Boolean getCustomerRunningFlag() {
       return customerRunningFlag;
   }

   public static void setCustomerRunningFlag(Boolean customerRunningFlag) {
       LockFlag.customerRunningFlag = customerRunningFlag;
   }
}


package com.qysxy.framework.queue;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Queue;
import java.util.concurrent.*;

/**
* @author fuguangli
* @description 前沿队列实用类,用于大量并发用户
* @Create date:    2017/3/7
*/
public class BlockingQueueHelper {


   private static final Integer maxQueueSize = 1000;
   private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize);
   private static ExecutorService threadPool = Executors.newCachedThreadPool();


   public static BlockingQueue getBlockingQueue() {
       if (blockingQueue == null) {
           blockingQueue = new LinkedBlockingQueue(maxQueueSize);
       }
       return blockingQueue;
   }

   /**
    * @param o 队列处理对象(包含request,response,data)
    */
   public static void requestQueue(Object o) {
       //检测当前的队列大小
       if (blockingQueue != null && blockingQueue.size() < maxQueueSize) {
           //可以正常进入队列
           if (blockingQueue.offer(o)) {
               //添加成功,检测数据处理线程是否正常
               if (LockFlag.getCustomerRunningFlag()) {
                   //说明处理线程类正常运行
               } else {
                   //说明处理线程类停止,此时,应重新启动线程进行数据处理
                   LockFlag.setCustomerRunningFlag(true);

                   //example:run
                   Customer customer = new Customer(blockingQueue);
                   threadPool.execute(customer);

               }

           } else {
               //进入队列失败,做出相应的处理,或者尝试重新进入队列

           }
       } else {
           //队列不正常,或队列大小已达上限,做出相应处理

       }

   }
}



from:https://blog.csdn.net/happydecai/article/details/82775499

本站文章内容,部分来自于互联网,若侵犯了您的权益,请致邮件chuanghui423#sohu.com(请将#换为@)联系,我们会尽快核实后删除。
Copyright © 2006-2023 DBMNG.COM All Rights Reserved. Powered by DEVSOARTECH            豫ICP备11002312号-2

豫公网安备 41010502002439号