| 网站首页 | 业界新闻 | 小组 | 威客 | 人才 | 下载频道 | 博客 | 代码贴 | 在线编程 | 编程论坛
欢迎加入我们,一同切磋技术
用户名:   
 
密 码:  
共有 3557 人关注过本帖
标题:请教一个关于socket通信的丢包问题,附上图片
只看楼主 加入收藏
林中剑影
Rank: 1
来 自:广州
等 级:新手上路
帖 子:91
专家分:0
注 册:2007-11-3
收藏
 问题点数:0 回复次数:2 
请教一个关于socket通信的丢包问题,附上图片
先给大家分析一下程序,
服务端:主要有队列,队列进出已加锁,还有队列监听程序,"_"为队空,"."队中有元素,"@"队满。并打印出响应的个数。
客户端:主要计算发送成功和发送失败的次数,并统计平均每秒的响应个数。

现在程序的主要问题是,一开始失败次数为0,过了一段时间后,失败次数曲线猛增长了,这点我搞不清楚?在这里请教大家给点意见,谢谢!!

服务端:
package test0815;

import *;
import
import
import
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class Server {
    private ServerSocket ss;
    private ExecutorService es;
    private static final int PORT = 10000;                // 监听端口s
    private static final int POOL_SIZE = 8, MAX_THREAD=25;
   
    
    public Server() throws IOException{
        this.ss = new ServerSocket(this.PORT);
        this.es = Executors.newFixedThreadPool(this.POOL_SIZE);
    }
    
    
    public void service() {
        
        System.out.println("server starting...[2008-08-15]v4");
        SocketQueue queue = new SocketQueue();
        Consumer customer = new Consumer(queue);
        
        for(int i=0;i<MAX_THREAD;i++){
            new Thread(customer).start();
        }
        
        Thread probe = new ProbeThread(queue);
        probe.start();
        
        while(true) {
            try {
                Socket socket = this.ss.accept();
                this.es.execute(new Handler(socket,queue));
                
            } catch(Exception ex) {
                System.out.println("Error[Server.service]:" + ex.getMessage());
            }
        }
    }
    
    public static void main(String args[]) {
        try {
            new Server().service();
        } catch(Exception ex) {
            System.out.println("Error[Server.main]:"+ex.getMessage());
        }
    }
}


class Handler implements Runnable {
    private Socket socket = null;
    private SocketQueue queue;
    
    public Handler(Socket s,SocketQueue queue) {
        this.socket = s;
        this.queue=queue;
    }
    
    public void run() {
        String s1="start";
        try {
            s1="BufferedReader.before";
            DataInputStream is = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
            
            s1="br.readLine.before";
            byte b = is.readByte();
    
            s1="queue.enqueue.before";
            this.queue.enqueue(b);
            
            is.close();
            
        } catch(Exception ex) {
            System.out.println("Error[Handler.run.1]at "+s1+":" + ex.getMessage());
        } finally {
            try {
                if (this.socket != null)
                    this.socket.close();    
            } catch(Exception ex) {
                System.out.println("Error[Handler.run.2]:"+"socket close failed!");
            }
        }
    }    
}

class SocketQueue{
    private static final int LENGTH = 5;  //20;                 
    private static int iLastConsume=0, iThisConsume=0;
    private final Object[] buffer = new Object[LENGTH];        
    private int front, rear, counter;                            
    
    private final ReentrantLock lock = new ReentrantLock();        
    private final Condition full = lock.newCondition();            
    private final Condition empty = lock.newCondition();
    
    private int accPush, accPop;
    private boolean run = true;
    
    public boolean isRun(){
        return this.run;
    }
    
    public void stop() {                                    
        this.run = false;
    }
    
    public String getInfo() {
        lock.lock();
        int buffer = 0;
        boolean b = false;
        int flagQ=0, countBuffer=0;
        String sym;
        
        iLastConsume = iThisConsume;
        iThisConsume = Finance.getSc();
        int iConsumePerSecond = (iThisConsume-iLastConsume) / 2;
        
        try {
            for(int i =0; i < this.buffer.length; i++) {
                if (this.buffer[i] != null) {
                    countBuffer++;
                    buffer = buffer + (int)(this.buffer[i].toString().charAt(0));
                }
            }
            
            if (this.buffer.length==countBuffer) flagQ=2;
            else if (countBuffer==0) flagQ=0;
            else flagQ=1;

            b = this.accPop + buffer == this.accPush;
        } catch(Exception ex) {
            System.out.println("Error[SocketQueue.getInfo]:"+ex.getMessage());
        }finally {
            lock.unlock();
        }
        
        if (b) {
            switch (flagQ) {
            case 0: sym="_"; break;                             //队列为空
            case 1: sym="."; break;                             //队列既不空也不满
            case 2: sym="@"; break;                             //队列为满
            default: sym="?";                                   //其他情况
            }
        }
        else
            sym="!";
        
        sym = sym+"["+(int)iConsumePerSecond+"]";
        return sym;
    }
    
    public void enqueue(Object o) throws InterruptedException{
        lock.lock();                                            // 对操作进行锁定
    
        try {
            while(buffer.length == counter) {                    // 如果信号量已满
                Finance.produceWait();                            // 记录等待次数
                full.await();                                    // full条件满足,等待出列
            }
            
            buffer[rear] = o;                                    // 入列
            accPush = accPush + (int)o.toString().charAt(0);
            if (++rear == buffer.length) rear = 0;                // 尾指针控制
            ++counter;                                            // 信号量控制
            Finance.doProduce(buffer);                            // 记录生产动作
            empty.signal();                                        // 给empty发送信号,唤醒其它线程        
        } finally {
            lock.unlock();                                        // 无论如何都要释放锁
        }
    }
    
    
    
    public Object dequeue() throws InterruptedException{
        lock.lock();                                            // 对操作进行锁定
        try {
            while(counter == 0) {                                // 当没有信号量时
                Finance.consumeWait();                            // 记录等待次数
                empty.await();                                    // empty条件满足,等待入列数据
            }
            
            Object o = buffer[front];
            accPop = accPop + (int)o.toString().charAt(0);
            buffer[front] = null;                                // 出列
            if (++front == buffer.length) front = 0;            // 头指针控制
            --counter;                                            // 信号量控制
            Finance.doConsume(buffer);                            // 记录消费动作
            
            full.signal();                                        // 给full发送信号,唤醒其它线程
            return o;
        } finally {
            lock.unlock();                                        // 无论如何都要释放锁
        }
    }
}


class Consumer implements Runnable {
    SocketQueue queue;                                        // 同步队列,与生产者使用同一SyncQueue实例(线程target)
    
    Socket socket;
    public Consumer(SocketQueue sq) {
        this.queue = sq;
    }
    
    public void run() {
        try {
            while(this.queue.isRun()) {
                this.queue.dequeue();                           // 出列
            }
        } catch(Exception ex) {
            System.out.println("Error[Consumer.run]:"+ex.getMessage());
        }
    }
}

class Finance {
    private static int sp, sc, dp, dc;

    public static int getSp(){
        return sp;
    }
    
    public static int getSc() {
        return sc;
    }
    
    public static int getDp() {
        return dp;
    }
    
    public static int getDc() {
        return dc;
    }

    public static void doProduce(Object[] buffer) {
        ++sp;
        
    }
    
    public static void doConsume(Object[] buffer) {
        ++sc;
    
    }
    
    public static void produceWait() {
        ++dp;
    }
    
    public static void consumeWait() {
        ++dc;
    }
}


class ProbeThread extends Thread {
    private SocketQueue queue;
    private boolean run = true;
    
    public ProbeThread(SocketQueue sq) {
        this.queue = sq;
    }
    
    @SuppressWarnings("static-access")
    public void run() {
        while(this.run) {
            System.out.print(this.queue.getInfo());
            
            try {
                Thread.sleep(1000*2-1);
            } catch(Exception ex) {
                System.out.println("Error[ProbeThread.run]:"+ex.getMessage());
            }
        }
    }
}

客户端:
package test0815;

import *;
import *;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;


public class Client {
    public static void main(String args[]) throws Exception{
            Counter c1 = new Counter();
            Counter c2 = new Counter();
            new Monitor(c1, c2).start();
            Timer.begin();
                
            int poolsize = 10;
            ExecutorService es = Executors.newFixedThreadPool(poolsize);
                
            for(int i=0; ;i++){
                es.execute(createTask(c1,c2));
                //new Thread(createTask(c1,c2)).start();
                try{
                    Thread.sleep(1);
                }catch(Exception e){
                    System.out.println(e.getMessage());
                }
            }
        }
            

    
    private static Runnable createTask(final Counter c1, final Counter c2) {
        return new Runnable() {
            Socket socket = null;
            private final int PORT = 10000;
            private final String ADDRESS = "127.0.0.1";
            
            public void run() {
                try {
                    this.socket = new Socket(this.ADDRESS, this.PORT);
                    //BufferedReader br = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
                    
                    DataOutputStream os = new DataOutputStream(socket.getOutputStream());
                    byte c = (byte)(Math.random() * 26 + 'A');
                    os.writeByte(c);
                    os.flush();
                    
                    c1.doCount();
                    os.close();
                } catch(Exception ex) {
                    c2.doCount();
                } finally {
                    try {
                        if(this.socket != null)
                            this.socket.close();
                    } catch(Exception ex) {
                        System.out.println(ex.getMessage());
                    }
                }                        
            }                
        };
    }
}
    
class Counter{    
    private int counter=0;
    private final ReentrantLock lock = new ReentrantLock();
    
    public void doCount(){
        lock.lock();
        try{
            ++counter;
        }finally{
            lock.unlock();
        }
    }
    
    public int getCounter(){
        return counter;
    }
}



class Timer{
    private static long begin;
    
    public static  void begin(){
        begin=System.currentTimeMillis();
    }
    
    public static long getBegin(){
        return begin;
    }
}
        

class Monitor extends Thread{
    private Counter success,failed;
    
    public Monitor(Counter success,Counter failed){
        this.success=success;
        this.failed=failed;
    }
    public void run(){
        while(true){
            StringBuffer ab = new StringBuffer();
            long s = success.getCounter();
            long f = failed.getCounter();
            
            ab.append("successful:" + s + "  ");
            ab.append("failed:" + f + "  ");
            
            long passed = System.currentTimeMillis()-Timer.getBegin();
            ab.append("timepassed:"+ passed + "ms  ");
            
            if(passed>0)
                ab.append("平均每秒响应:" + ((((float)s / passed)) * 1000));
            
            System.out.println(ab.toString());
            try{
                Thread.sleep(1000);
            }catch(Exception e){
                System.out.println(e.getMessage());
            }
            
        }
    }
}

Q1.jpg (84.98 KB)
图片附件: 游客没有浏览图片的权限,请 登录注册


QQ2.jpg (19.92 KB)
图片附件: 游客没有浏览图片的权限,请 登录注册
搜索更多相关主题的帖子: socket 通信 
2008-08-18 23:12
林中剑影
Rank: 1
来 自:广州
等 级:新手上路
帖 子:91
专家分:0
注 册:2007-11-3
收藏
得分:0 
急啊,有没有人遇到过socket通信的丢包问题啊?

好好学习,天天向上!
2008-08-20 21:37
林中剑影
Rank: 1
来 自:广州
等 级:新手上路
帖 子:91
专家分:0
注 册:2007-11-3
收藏
得分:0 
有没有人看的明白啊!?帮帮忙啊

好好学习,天天向上!
2008-08-30 23:42
快速回复:请教一个关于socket通信的丢包问题,附上图片
数据加载中...
 
   



关于我们 | 广告合作 | 编程中国 | 清除Cookies | TOP | 手机版

编程中国 版权所有,并保留所有权利。
Powered by Discuz, Processed in 0.022134 second(s), 8 queries.
Copyright©2004-2024, BCCN.NET, All Rights Reserved