java socket 发送字节切片发送

一方write,一方read,有没有可能write正确返回,这时网络异常,read抛异常从而认为接收操作失败,如果这样的话,这个data不是可能丢失吗?java socket是否会保证不出现这种情况?
问题补充:可能我没描述清楚。我的意思是我希望data要么发送失败,于是留在发送端,我可以尝试再发,接收端也知道没有成功收到data,要么data发送成功,发送端和接收端都明确发送成功。那会不会发送方认为成功,接收方认为失败,于是data丢失[1]。或者发送方认为失败,于是重发,而接收方认为成功,造成接收方会收到2次data[2]。我想知道java socket会不会造成[1][2]发生,如何防御不使[1][2]发生那?谢谢。
如果你是基于UDP协议的话,那是不能保证的,因为发的一方不管收的一方的事。
而TCP是基于连接的,通信双方首先要三次握手建立连接,连接建立后就一直连着,然后开始发送数据,每一次发送方都要等着收到接收方的确认,如果没收到确认,那就说明没发成功,所有不会存在你说的一方认为发送成功了,另一方却没收到的情况。TCP连接关闭的时候是经过四次握手的,因为是全双工的,一方只能关闭自己的连接。
TCP 中,数据接收方需要确认数据的收到,发送方只有收到确认才认为成功了,所以如果发送方确认成功了,则接收方也是成功的。如果接收方收到了数据,但是回送确认的时候网络断了,则发送方认为是失败的,如果继续重新发送的话也会出现网络连接异常。在网络中断的情况下,当双方重新建立起连接时,就需要双方确认上次中断的地方,然后重新传输数据,这就是应用层的事情了。这是我个人的理解,如果有不对的地方,欢迎高手指正。
client 抛出异常,就把发生异常的数据保留,作为失败记录。
失败记录隔一段时间再发。
Service 设置接受超时,连接发生中断后,过超时就会退出。
data是可能丢失,处理方法是设置超时时间,如果发生异常,再次建立连接。
做个心跳测试监听,设置连接超时时间,异常处理,缓存,就解决了。
最好用NIO,可以使用Mina,Mina提供了sessionIdle,在指定时间没有接收到信息的话,就断开,当然异常机制也写的非常好。
已解决问题
未解决问题Java TCP/IP Socket 编程 笔记(四)—发送和接收数据 -
- ITeye技术网站
博客分类:
1.TCP/IP协议要求信息必须在块(chunk)中发送和接收,而块的长度必须是8位的倍数,因此,我们可以认为TCP/IP协议中传输的信息是字节序列。如何发送和解析信息需要一定的应用程序协议。
2.信息编码:
&&& 首先是Java里对基本整型的处理,发送时,要注意:1)每种数据类型的字节个数;2)这些字节的发送顺序是怎样的?(little-endian还是big-endian);3)所传输的数值是有符号的(signed)还是无符号的(unsigned)。具体编码时采用位操作(移位和屏蔽)就可以了。具体在Java里,可以采用DataOutputStream类和ByteArrayOutputStream来实现。恢复时可以采用DataInputStream类和ByteArrayInputStream类。
&&& 其次,字符串和文本,在一组符号与一组整数之间的映射称为编码字符集(coded character set)。发送者与接收者必须在符号与整数的映射方式上达成共识,才能使用文本信息进行通信,最简单的方法就是定义一个标准字符集。具体编码时采用String的getBytes()方法。
&&& 最后,位操作。如果设置一个特定的设为1,先设置好掩码(mask),之后用或操作;要清空特定一位,用与操作。
3.成帧与解析
成帧(framing)技术解决了接收端如何定位消息的首位位置的问题。
如果接收者试图从套接字中读取比消息本身更多的字节,将可能发生以下两种情况之一:如果信道中没有其他消息,接收者将阻塞等待,同时无法处理接收到的消息;如果发送者也在等待接收端的响应消息,则会形成死锁(dealock);另一方面,如果信道中还有其他消息,则接收者会将后面消息的一部分甚至全部读到第一条消息中去,这将产生一些协议错误。因此,在使用TCP套接字时,成帧就是一个非常重要的考虑因素。
有两个技术:
1. 基于定界符(Delimiter-based):消息的结束由一个唯一的标记(unique marker)指出,即发送者在传输完数据后显式添加的一个特殊字节序列。这个特殊标记不能在传输的数据中出现。幸运的是,填充(stuffing)技术能够对消息中出现的定界符进行修改,从而使接收者不将其识别为定界符。在接收者扫描定界符时,还能识别出修改过的数据,并在输出消息中对其进行还原,从而使其与原始消息一致。
2. 显式长度(Explicit length):在变长字段或消息前附加一个固定大小的字段,用来指示该字段或消息中包含了多少字节。这种方法要确定消息长度的上限,以确定保存这个长度需要的字节数。
接口:
import java.io.IOE
import java.io.OutputS
public interface Framer {
void frameMsg(byte [] message,OutputStream out) throws IOE
byte [] nextMsg() throws IOE
}
定界符的方式:
import java.io.ByteArrayOutputS
import java.io.EOFE
import java.io.IOE
import java.io.InputS
import java.io.OutputS
public class DelimFramer implements Framer {
private InputS//
private static final byte DELIMTER=(byte)'\n';//message delimiter
public DelimFramer(InputStream in){
public void frameMsg(byte[] message, OutputStream out) throws IOException {
//ensure that the message dose not contain the delimiter
for(byte b:message){
if(b==DELIMTER)
throw new IOException("Message contains delimiter");
out.write(message);
out.write(DELIMTER);
out.flush();
public byte[] nextMsg() throws IOException {
ByteArrayOutputStream messageBuffer=new ByteArrayOutputStream();
while((nextByte=in.read())!=DELIMTER){
if(nextByte==-1){//end of stream?
if(messageBuffer.size()==0){
throw new EOFException("Non-empty message without delimiter");
messageBuffer.write(nextByte);
return messageBuffer.toByteArray();
}
显式长度方法:
import java.io.DataInputS
import java.io.EOFE
import java.io.IOE
import java.io.InputS
import java.io.OutputS
public class LengthFramer implements Framer {
public static final int MAXMESSAGELENGTH=65535;
public static final int BYTEMASK=0
public static final int SHOTMASK=0
public static final int BYTESHIFT=8;
private DataInputS// wrapper for data I/O
public LengthFramer(InputStream in) throws IOException{
this.in=new DataInputStream(in);
public void frameMsg(byte[] message, OutputStream out) throws IOException {
if(message.length&MAXMESSAGELENGTH){
throw new IOException("message too long");
//write length prefix
out.write((message.length&&BYTEMASK)&BYTEMASK);
out.write(message.length&BYTEMASK);
//write message
out.write(message);
out.flush();
public byte[] nextMsg() throws IOException {
length=in.readUnsignedShort();
}catch(EOFException e){
//no (or 1 byte)
//0&=length&=65535;
byte [] msg=new byte[length];
in.readFully(msg);//if exception,it'
浏览: 177954 次
来自: 杭州
照着做了,但是不行啊,还是乱码.
您好,能不能把语言包给我发过来,我找不到。谢谢 1790958 ...
修改配置路径到 JDK 安装目录下的 jre 亦可
搂主是正确的,刚刚招到原因,我自己写了一个serde,里面用了 ...
请问,你这个做过测试不,我怎么自己试了不行呢 Text t = ...Java Socket长连接示例代码 - shihuan830619 - ITeye技术网站
博客分类:
SocketListenerPusher.java代码如下:
import java.io.IOE
import java.net.InetSocketA
import java.net.ServerS
import java.net.S
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.ScheduledThreadPoolE
import java.util.concurrent.TimeU
import mons.configuration.ConfigurationE
import org.directwebremoting.impl.DaemonThreadF
import org.slf4j.L
import org.slf4j.LoggerF
import com.mon.utils.PropertiesU
import com.shihuan.dragonkeeper.global.ConfigF
public class SocketListenerPusher implements Runnable {
protected static Logger logger = LoggerFactory.getLogger(SocketListenerPusher.class);
public static String socketlistenerserver_CONFIG = ConfigFile.SOCKETLISTENERSERVER__CONFIG + ConfigFile.SUFFIX_NAME;
private ServerSocket serverS
private ExecutorS
public SocketListenerPusher() {
int port = 0;
int poolsize = 0;
port = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverport"));
poolsize = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "poolsize"));
serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(port));
pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * poolsize);
//下面两句循环执行run()方法, 相当于while(true){...}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
executor.scheduleAtFixedRate(this, 1L, 1L, TimeUnit.MILLISECONDS);
} catch (NumberFormatException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
} catch (ConfigurationException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
} catch (IOException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
public void run() {
Socket socket =
socket = serverSocket.accept();
pool.execute(new SocketListenerHandler(socket));
} catch (IOException e) {
System.out.println("线程池被关闭!!!!!!!!!!!");
pool.shutdown();
logger.error(e.getMessage(), e);
e.printStackTrace();
SocketListenerHandler.java代码如下:
import java.io.BufferedInputS
import java.io.BufferedR
import java.io.IOE
import java.io.InputStreamR
import java.io.ObjectInputS
import java.net.S
import java.sql.C
import java.sql.SQLE
import mons.configuration.ConfigurationE
import mons.dbutils.DbU
import mons.dbutils.QueryR
import mons.io.IOU
import org.directwebremoting.B
import org.directwebremoting.ScriptS
import org.slf4j.L
import org.slf4j.LoggerF
import com.alibaba.fastjson.JSON;
import com.mon.dto.DataSourceI
import com.mon.utils.ByteArrayU
import com.mon.utils.DataSourceMapU
import com.mon.utils.DateFormatterU
import com.mon.utils.PropertiesU
import com.shihuan.dragonkeeper.global.ConfigF
import com.shihuan.dragonkeeper.server.bean.ActivityServiceB
public class SocketListenerHandler implements Runnable {
protected static Logger logger = LoggerFactory.getLogger(SocketListenerHandler.class);
private static String jdbc_CONFIG = ConfigFile.JDBC_CONFIG + ConfigFile.SUFFIX_NAME;
public static final int timeOut = 0*1000 ;
//设置读取操作异常为1秒
private final String dataRealTimeAction_id = "Agentdata_" + Math.random();
private static final String noData = "{'nodata':'心跳信息'}";
private static final String errorData = "{'error':'无法解析的请求'}";
private Socket connectedsocket =
public SocketListenerHandler(Socket socket){
this.connectedsocket =
public void run() {
BufferedReader in =
String resultData = "";
connectedsocket.setSoTimeout(timeOut);
//表示接收数据时的等待超时数据, 此方法必须在接收数据之前执行才有效. 此外, 当输入流的 read()方法抛出 SocketTimeoutException后, Socket仍然是连接的, 可以尝试再次读数据, 单位为毫秒, 它的默认值为 0(表示会无限等待, 永远不会超时)
connectedsocket.setKeepAlive(false);
//表示对于长时间处于空闲状态的Socket, 是否要自动把它关闭.
in = new BufferedReader(new InputStreamReader(connectedsocket.getInputStream()));
if (in.ready()) {
//判断流中是否有数据
resultData = getNoHeadData(in.readLine());
//从Agent端接收到的数据
("#### 结果DATA = "+resultData);
if (resultData==null || "".equals(resultData)) {
(dataRealTimeAction_id + " --&&& " + "内容为空!");
} else if (resultData.charAt(0) != '{') {
//要在客户端定时维持心跳信息
(dataRealTimeAction_id + " --&&& " + noData);
ActivityServiceBean asb = JSON.parseObject(resultData, ActivityServiceBean.class);
System.out.println("打印预处理信息Start......");
System.out.println(asb.getProxyname() + " -- " + asb.getIp() + " -- " + asb.getCalltime() + " -- " + asb.getAnswertime() + " -- " + asb.getCpu() + " -- " + asb.getThread() + " -- " + asb.getStatus() + " -- " + asb.getAccessaddress() + " -- " + asb.getAccessfilename() + " -- " + asb.getSql() + " -- " + asb.getContent());
System.out.println("打印预处理信息End......");
parseData(ois);
(dataRealTimeAction_id + ": 成功处理了接收到的数据!");
} catch (IOException e) {
logger.error(e.getMessage() + " " + errorData, e);
e.printStackTrace();
} catch (NumberFormatException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
} finally {
if (in != null) {
in.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
TestSocketListenerPusher.java请求端代码如下:
import java.io.BufferedOutputS
import java.io.IOE
import java.io.OutputS
import java.net.S
import java.net.UnknownHostE
import java.util.D
import mons.configuration.ConfigurationE
import com.alibaba.fastjson.JSON;
import com.mon.utils.ByteArrayU
import com.mon.utils.PropertiesU
import com.shihuan.dragonkeeper.global.ConfigF
import com.shihuan.dragonkeeper.server.bean.ActivityServiceB
public class TestSocketListenerPusher implements Runnable {
private static String socketlistenerserver_CONFIG = ConfigFile.SOCKETLISTENERSERVER__CONFIG + ConfigFile.SUFFIX_NAME;
private Socket socketclient =
public void run() {
String serverip = "";
int port = 0;
OutputStream os =
serverip = PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverip");
port = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverport"));
ActivityServiceBean asb =
for (int i=0; i&2; i++) {
asb = new ActivityServiceBean();
asb.setProxyname("testProxyname"+i);
asb.setIp("testIp"+i);
Date curdate = new Date();
asb.setCalltime(curdate);
asb.setAnswertime(curdate);
asb.setCpu("testCpu"+i);
asb.setThread("testThread"+i);
asb.setStatus("testStatus"+i);
asb.setAccessaddress("testAccessaddress"+i);
asb.setAccessfilename("testAccessfilename"+i);
asb.setSql("testSql"+i);
asb.setContent("testContent"+i);
String jsonStr = JSON.toJSONString(asb).trim();
byte[] information = (new String(ByteArrayUtil.getIntToByte(jsonStr.length()))+jsonStr).getBytes();
System.out.println(information.length);
socketclient = new Socket(serverip, port);
socketclient.setSoTimeout(0);
socketclient.setKeepAlive(false);
os = new BufferedOutputStream(socketclient.getOutputStream());
os.write(information);
os.flush();
System.out.println("Client" + i + " --&&& " + new String(ByteArrayUtil.getIntToByte(jsonStr.length()))+jsonStr);
os.close();
Thread.sleep(3000);
} catch (ConfigurationException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (os != null) {
os.close();
} catch (IOException e) {
e.printStackTrace();
public static void main(String[] args) {
Thread t = new Thread(new TestSocketListenerPusher());
t.start();
源代码在笔者邮箱网盘中J2EE代码文件夹里。
----------------------------------------------------------------------------------
如果是按byte[]传输数据的情况,请参考如下代码:
SimpleSocketServer.java代码如下:
package com.shihuan.
import java.io.BufferedInputS
import java.io.IOE
import java.io.InputS
import java.net.InetSocketA
import java.net.ServerS
import java.net.S
public class SimpleSocketServer {
public static void main(String[] args) {
ServerSocket ss = new ServerSocket();
ss.setReuseAddress(true);
//两个进程共用同一个端口的时候,一个进程关闭后,另一个进程还能够立刻重用相同端口
ss.setReceiveBufferSize(128*1024);
//缓冲区中允许接收的最大字节数,默认是8192
ss.bind(new InetSocketAddress(19990));
Socket client = ss.accept();
InputStream in = new BufferedInputStream(client.getInputStream());
byte tmpb = (byte)in.read();
System.out.println("第一个字节的byte值 ---&& " + tmpb);
System.out.println("接收字节 ---&& " + in.available());
byte[] bc = new byte[in.available()+1];
in.read(bc, 1, in.available());
System.out.println(bc.length);
System.out.println(new String(bc));
in.close();
} catch (IOException e) {
System.out.println(e.getMessage());
e.printStackTrace();
SimpleSocketClient.java代码如下:
package com.shihuan.
import java.io.BufferedOutputS
import java.io.IOE
import java.io.OutputS
import java.net.S
import java.net.UnknownHostE
public class SimpleSocketClient {
public static void main(String[] args) throws UnknownHostException {
Socket s = new Socket("192.168.1.10", 19990);
OutputStream os = new BufferedOutputStream(s.getOutputStream());
String info = "abc!";
info = "大家好!";
byte[] bi = info.getBytes();
os.write(bi);
os.flush();
os.close();
} catch (IOException e) {
System.out.println(e.getMessage());
e.printStackTrace();
稍微复杂一点儿代码示例,处理了粘包问题:
StartListenerTcpThread.java代码:
import java.io.BufferedInputS
import java.io.IOE
import java.io.InputS
import java.net.InetSocketA
import java.net.ServerS
import java.net.S
import java.net.SocketA
import java.util.V
import java.util.concurrent.ExcutorS
import java.util.concurrent.E
import mons.io.IU
import org.slf4j.L
import org.slf4j.LoggerF
import com.mon.utils.ByteArrayU
import com.shihuan.dragonkeeper.global.ConfigF
public class StartListenerTcpThread implements Runnable {
public static Logger logger = LoggerFactory.getLogger(StartListenerTcpThread.class);
private static ExcutorService Threadpool = Excutors.newCachedThreadPool();
private static boolean businessflag =
private static final int receiveBufferSize = 128;
private static Vector&byte[]& tmpbytes = new Vector&byte[]&();
private ServerSocket serverSocket =
public StartListenerTcpThread(String ip, int port){
serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.setReceiveBufferSize(receiveBufferSize*1024);
serverSocket.setSoTimeout(0);
SocketAddress sa = new InetSocketAddress(port);
serverSocket.bind(sa, 20);
}catch(IOException e){
logger.error(e.getMessage(), e);
public void run(){
Socket socket =
while(true){
if(businessflag){
socket = serverSocket.accept();
System.out.println("New connection accepted " + socket.getInetAddress() + ":" + socket.getPort());
InputStream socketIn = new BufferedInputStream(socket.getInputStream());
byte tmpb = (byte)socketIn.read();
byte[] currentbytes =
if(tmpbytes.size() & 0){
//上一次IO流中有未处理的剩余包
int oldBytesLen = tmpbytes.get(0).
int socketBytesLen = socketIn.available()+1;
int currentLength = oldByteLen + socketBytesL
currentbytes = new byte[currentLength];
System.arraycopy(tmpbytes.get(0), 0, currentbytes, oldBytesLen);
currentbytes[oldBytesLen] =
socketIn.read(currentbytes, oldBytesLen+1, socketBytesLen-1);
socketIn.close();
splitInputStreamByte(currentbytes);
//正常未粘包情况
int socketBytesLen = socketIn.available()+1;
currentbytes = new byte[socketBytesLen];
currentbytes[0] =
socketIn.read(currentbytes, 1, socketBytesLen-1);
socketIn.close();
splitInputStreamByte(currentbytes);
}catch(IOException e){
logger.error(e.getMessage(), e);
* 拆分byte数组并分多线程处理
* @param parambytes 原byte数组
* @return 处理后剩余部分的byte数组
private static void splitInputStreamByte(byte[] parambytes) {
if(parambytes != null){
if(parambytes.length & 4){
byte[] head = new byte[4];
//单包长度
System.arraycopy(parambytes, 0, head, 0, 4);
int bodyLength = ByteArrayUtil.getint(head);
if(bodyLength &= parambytes.length-4){
final byte[] body = new byte[bodyLength];
System.arraycopy(parambytes, 4, body, 0, bodyLength);
ThreadPool.execute(new Runnable(){
public void run(){
byte[] processDatas =
System.out.println(IOUtils.toString(processDatas, "UTF-8").trim());
}catch(IOException e){
logger.error(e.getMessage(), e);
int resultLen = parambytes.length-4-bodyL
if(resultLen == 0){
splitInputStreamByte(null);
byte[] resultbytes = new byte[resultLen];
System.arraycopy(parambytes, 4+bodyLength, resultbytes, 0, resultLen);
splitInputStreamByte(resultbytes);
tmpbytes.clear();
tmpbytes.add(parambytes);
tmpbytes.clear();
tmpbytes.add(parambytes);
public static void openflag(){
businessflag =
public static void closeflag(){
businessflag =
TestTcpSocket.java代码:
import java.io.IOE
import java.io.OutputS
import java.net.S
import java.net.UnknownHostE
import com.mon.utils.ByteArrayU
import com.shihuan.dragonkeeper.global.ConfigF
public class TestTcpSocket implements Runnable{
private Socket socketClient =
public void run(){
String serverip = "192.168.1.10";
int port = 19990;
while(true){
System.out.println("SocketClient start......");
String mystr = "hello everyone!";
socketClient = new Socket(serverip, port);
OutputStream os = socketClient.getOutputStream();
byte[] head = ByteArrayUtil.int2byte(mystr.length());
byte[] body = mystr.getBytes();
byte[] total = ByteArrayUtil.byteMerge(head, body);
os.write(total);
os.flush();
os.close();
Thread.sleep(1000);
System.out.println("SocketClient end......");
}catch(Exception e){
logger.error(e.getMessage(), e);
public static void main(String[] args){
Thread t = new Thread(new TestTcpSocket());
t.start();
下面写ByteArrayUtil.java代码:
package com.mon.
public class ByteArrayUtil {
* 将int型的数据类型转换成byte[]类型
public static final byte[] int2byte(int paramInt){
byte[] resultByte = new byte[4];
resultByte[3] = ((byte)(paramInt & 0xFF));
resultByte[2] = ((byte)(paramInt &&& 8 & 0xFF));
resultByte[1] = ((byte)(paramInt &&& 16 & 0xFF));
resultByte[0] = ((byte)(paramInt &&& 24 & 0xFF));
return resultB
* 将byte型的数据类型转换成int类型
public static final int getint(byte[] paramArrayOfByte){
int result = (paramArrayOfByte[0] & 0xFF) && 24 | (paramArrayOfByte[1] & 0xFF) && 16 | (paramArrayOfByte[2] & 0xFF) && 8 | paramArrayOfByte[3] & 0xFF;
* 合并两个byte数组到一个byte数组中
public static byte[] byteMerge(byte[] byte1, byte[] byte2){
byte[] result = new byte[byte1.length+byte2.length];
System.arraycopy(byte1, 0, result, 0, byte1.length);
System.arraycopy(byte2, 0, result, byte1.length, byte2.length);
shihuan830619
浏览: 183514 次
来自: 上海
忘了附带邮箱了
朋友能否发一份源码吗!最近也在研究dubboz这块
大神求下源码
大神你好,求源码

我要回帖

更多关于 java socket 发送字节 的文章

 

随机推荐