请教一个关于socket通信的丢包问题,附上图片
先给大家分析一下程序,服务端:主要有队列,队列进出已加锁,还有队列监听程序,"_"为队空,"."队中有元素,"@"队满。并打印出响应的个数。
客户端:主要计算发送成功和发送失败的次数,并统计平均每秒的响应个数。
现在程序的主要问题是,一开始失败次数为0,过了一段时间后,失败次数曲线猛增长了,这点我搞不清楚?在这里请教大家给点意见,谢谢!!
服务端:
package test0815;
import *;
import
import
import
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Server {
private ServerSocket ss;
private ExecutorService es;
private static final int PORT = 10000; // 监听端口s
private static final int POOL_SIZE = 8, MAX_THREAD=25;
public Server() throws IOException{
this.ss = new ServerSocket(this.PORT);
this.es = Executors.newFixedThreadPool(this.POOL_SIZE);
}
public void service() {
System.out.println("server starting...[2008-08-15]v4");
SocketQueue queue = new SocketQueue();
Consumer customer = new Consumer(queue);
for(int i=0;i<MAX_THREAD;i++){
new Thread(customer).start();
}
Thread probe = new ProbeThread(queue);
probe.start();
while(true) {
try {
Socket socket = this.ss.accept();
this.es.execute(new Handler(socket,queue));
} catch(Exception ex) {
System.out.println("Error[Server.service]:" + ex.getMessage());
}
}
}
public static void main(String args[]) {
try {
new Server().service();
} catch(Exception ex) {
System.out.println("Error[Server.main]:"+ex.getMessage());
}
}
}
class Handler implements Runnable {
private Socket socket = null;
private SocketQueue queue;
public Handler(Socket s,SocketQueue queue) {
this.socket = s;
this.queue=queue;
}
public void run() {
String s1="start";
try {
s1="BufferedReader.before";
DataInputStream is = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
s1="br.readLine.before";
byte b = is.readByte();
s1="queue.enqueue.before";
this.queue.enqueue(b);
is.close();
} catch(Exception ex) {
System.out.println("Error[Handler.run.1]at "+s1+":" + ex.getMessage());
} finally {
try {
if (this.socket != null)
this.socket.close();
} catch(Exception ex) {
System.out.println("Error[Handler.run.2]:"+"socket close failed!");
}
}
}
}
class SocketQueue{
private static final int LENGTH = 5; //20;
private static int iLastConsume=0, iThisConsume=0;
private final Object[] buffer = new Object[LENGTH];
private int front, rear, counter;
private final ReentrantLock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
private final Condition empty = lock.newCondition();
private int accPush, accPop;
private boolean run = true;
public boolean isRun(){
return this.run;
}
public void stop() {
this.run = false;
}
public String getInfo() {
lock.lock();
int buffer = 0;
boolean b = false;
int flagQ=0, countBuffer=0;
String sym;
iLastConsume = iThisConsume;
iThisConsume = Finance.getSc();
int iConsumePerSecond = (iThisConsume-iLastConsume) / 2;
try {
for(int i =0; i < this.buffer.length; i++) {
if (this.buffer[i] != null) {
countBuffer++;
buffer = buffer + (int)(this.buffer[i].toString().charAt(0));
}
}
if (this.buffer.length==countBuffer) flagQ=2;
else if (countBuffer==0) flagQ=0;
else flagQ=1;
b = this.accPop + buffer == this.accPush;
} catch(Exception ex) {
System.out.println("Error[SocketQueue.getInfo]:"+ex.getMessage());
}finally {
lock.unlock();
}
if (b) {
switch (flagQ) {
case 0: sym="_"; break; //队列为空
case 1: sym="."; break; //队列既不空也不满
case 2: sym="@"; break; //队列为满
default: sym="?"; //其他情况
}
}
else
sym="!";
sym = sym+"["+(int)iConsumePerSecond+"]";
return sym;
}
public void enqueue(Object o) throws InterruptedException{
lock.lock(); // 对操作进行锁定
try {
while(buffer.length == counter) { // 如果信号量已满
Finance.produceWait(); // 记录等待次数
full.await(); // full条件满足,等待出列
}
buffer[rear] = o; // 入列
accPush = accPush + (int)o.toString().charAt(0);
if (++rear == buffer.length) rear = 0; // 尾指针控制
++counter; // 信号量控制
Finance.doProduce(buffer); // 记录生产动作
empty.signal(); // 给empty发送信号,唤醒其它线程
} finally {
lock.unlock(); // 无论如何都要释放锁
}
}
public Object dequeue() throws InterruptedException{
lock.lock(); // 对操作进行锁定
try {
while(counter == 0) { // 当没有信号量时
Finance.consumeWait(); // 记录等待次数
empty.await(); // empty条件满足,等待入列数据
}
Object o = buffer[front];
accPop = accPop + (int)o.toString().charAt(0);
buffer[front] = null; // 出列
if (++front == buffer.length) front = 0; // 头指针控制
--counter; // 信号量控制
Finance.doConsume(buffer); // 记录消费动作
full.signal(); // 给full发送信号,唤醒其它线程
return o;
} finally {
lock.unlock(); // 无论如何都要释放锁
}
}
}
class Consumer implements Runnable {
SocketQueue queue; // 同步队列,与生产者使用同一SyncQueue实例(线程target)
Socket socket;
public Consumer(SocketQueue sq) {
this.queue = sq;
}
public void run() {
try {
while(this.queue.isRun()) {
this.queue.dequeue(); // 出列
}
} catch(Exception ex) {
System.out.println("Error[Consumer.run]:"+ex.getMessage());
}
}
}
class Finance {
private static int sp, sc, dp, dc;
public static int getSp(){
return sp;
}
public static int getSc() {
return sc;
}
public static int getDp() {
return dp;
}
public static int getDc() {
return dc;
}
public static void doProduce(Object[] buffer) {
++sp;
}
public static void doConsume(Object[] buffer) {
++sc;
}
public static void produceWait() {
++dp;
}
public static void consumeWait() {
++dc;
}
}
class ProbeThread extends Thread {
private SocketQueue queue;
private boolean run = true;
public ProbeThread(SocketQueue sq) {
this.queue = sq;
}
@SuppressWarnings("static-access")
public void run() {
while(this.run) {
System.out.print(this.queue.getInfo());
try {
Thread.sleep(1000*2-1);
} catch(Exception ex) {
System.out.println("Error[ProbeThread.run]:"+ex.getMessage());
}
}
}
}
客户端:
package test0815;
import *;
import *;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
public class Client {
public static void main(String args[]) throws Exception{
Counter c1 = new Counter();
Counter c2 = new Counter();
new Monitor(c1, c2).start();
Timer.begin();
int poolsize = 10;
ExecutorService es = Executors.newFixedThreadPool(poolsize);
for(int i=0; ;i++){
es.execute(createTask(c1,c2));
//new Thread(createTask(c1,c2)).start();
try{
Thread.sleep(1);
}catch(Exception e){
System.out.println(e.getMessage());
}
}
}
private static Runnable createTask(final Counter c1, final Counter c2) {
return new Runnable() {
Socket socket = null;
private final int PORT = 10000;
private final String ADDRESS = "127.0.0.1";
public void run() {
try {
this.socket = new Socket(this.ADDRESS, this.PORT);
//BufferedReader br = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
DataOutputStream os = new DataOutputStream(socket.getOutputStream());
byte c = (byte)(Math.random() * 26 + 'A');
os.writeByte(c);
os.flush();
c1.doCount();
os.close();
} catch(Exception ex) {
c2.doCount();
} finally {
try {
if(this.socket != null)
this.socket.close();
} catch(Exception ex) {
System.out.println(ex.getMessage());
}
}
}
};
}
}
class Counter{
private int counter=0;
private final ReentrantLock lock = new ReentrantLock();
public void doCount(){
lock.lock();
try{
++counter;
}finally{
lock.unlock();
}
}
public int getCounter(){
return counter;
}
}
class Timer{
private static long begin;
public static void begin(){
begin=System.currentTimeMillis();
}
public static long getBegin(){
return begin;
}
}
class Monitor extends Thread{
private Counter success,failed;
public Monitor(Counter success,Counter failed){
this.success=success;
this.failed=failed;
}
public void run(){
while(true){
StringBuffer ab = new StringBuffer();
long s = success.getCounter();
long f = failed.getCounter();
ab.append("successful:" + s + " ");
ab.append("failed:" + f + " ");
long passed = System.currentTimeMillis()-Timer.getBegin();
ab.append("timepassed:"+ passed + "ms ");
if(passed>0)
ab.append("平均每秒响应:" + ((((float)s / passed)) * 1000));
System.out.println(ab.toString());
try{
Thread.sleep(1000);
}catch(Exception e){
System.out.println(e.getMessage());
}
}
}
}