1 /// 2 module redisd.connection; 3 4 import std.algorithm; 5 import std.conv; 6 import std.stdio; 7 8 import url:URL; 9 /// 10 interface Connection { 11 /// 12 void connect(URL) @safe; 13 /// 14 size_t send(immutable(ubyte)[]); 15 /// 16 immutable(ubyte)[] recv(size_t); 17 /// 18 void close(); 19 } 20 21 alias ConnectionMaker = Connection function() @safe; 22 /// std.socket transport 23 class SocketConnection : Connection { 24 import std.socket; 25 private { 26 string _host; 27 ushort _port; 28 Socket _socket; 29 } 30 31 this() @safe { 32 _socket = new Socket(AddressFamily.INET, SocketType.STREAM); 33 _socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 34 35 } 36 37 override void connect(URL url) @safe { 38 string host = url.host; 39 ushort port = url.port; 40 auto addr = new InternetAddress(host, port); 41 _socket.connect(addr); 42 } 43 44 override immutable(ubyte)[] recv(size_t to_receive) { 45 immutable(ubyte)[] result; 46 result.length = to_receive; 47 auto r = _socket.receive(cast(void[])result); 48 if ( r <= 0 ) { 49 return result[0..0]; 50 } 51 return result[0..r]; 52 } 53 54 override size_t send(immutable(ubyte)[] data) { 55 return _socket.send(cast(void[])data, SocketFlags.NONE); 56 } 57 58 override void close() { 59 _socket.close(); 60 } 61 62 } 63 /// std.socket connection builder 64 Connection stdConnectionMaker() @safe { 65 return new SocketConnection(); 66 } 67 68 version(vibe) { 69 import vibe.vibe; 70 import vibe.core.net; 71 import eventcore.core; 72 import std.exception; 73 import std.socket; 74 75 /// vibe-d connection builder 76 Connection vibeConnectionMaker() @safe { 77 return new VibeSocketConnection(); 78 } 79 /// vibe-d transport 80 class VibeSocketConnection: Connection { 81 private { 82 TCPConnection _socket; 83 } 84 /// 85 this() @safe { 86 } 87 override void connect(URL url) { 88 auto a = getAddressInfo(url.host, AddressFamily.INET); 89 _socket = connectTCP(a[0].address.toAddrString, url.port); 90 } 91 92 override immutable(ubyte)[] recv(size_t to_receive) { 93 ubyte[] result; 94 result.length = to_receive; 95 auto r = _socket.read(result, IOMode.once); 96 if (r <= 0) { 97 return assumeUnique(result[0 .. 0]); 98 } 99 return assumeUnique(result[0..r]); 100 } 101 102 override size_t send(immutable(ubyte)[] data) { 103 try { 104 auto r = _socket.write(data, IOMode.all); 105 return r; 106 } catch (Exception e) { 107 logError(e.toString); 108 return -1; 109 } 110 } 111 112 override void close() { 113 _socket.close(); 114 } 115 } 116 } 117 118 version(hio) { 119 import std.datetime; 120 import std.socket; 121 import std.format; 122 import hio.socket; 123 import hio.events; 124 /// hio connection builder 125 Connection hioConnectionMaker() @safe { 126 return new HioSocketConnection(); 127 } 128 /// hio transport 129 class HioSocketConnection : Connection { 130 private { 131 HioSocket _socket; 132 } 133 134 this() @safe { 135 _socket = new HioSocket(); 136 } 137 138 override void connect(URL url) { 139 auto a = getAddressInfo(url.host, AddressFamily.INET); 140 _socket.connect("%s:%d".format(a[0].address.toAddrString, url.port), 1.seconds); 141 } 142 143 override immutable(ubyte)[] recv(size_t to_receive) { 144 immutable(ubyte)[] result; 145 result.length = to_receive; 146 IOResult r = _socket.recv(to_receive); 147 if ( r.error || r.input.length == 0) { 148 return result[0..0]; 149 } 150 return r.input; 151 } 152 153 override size_t send(immutable(ubyte)[] data) { 154 return _socket.send(data, 1.seconds); 155 } 156 157 override void close() { 158 _socket.close(); 159 } 160 } 161 }