1 ///
2 module redisd.connection;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.stdio;
7 
8 import url:URL;
9 ///
10 interface Connection {
11     ///
12     void connect(URL) @safe;
13     ///
14     size_t send(immutable(ubyte)[]);
15     ///
16     immutable(ubyte)[] recv(size_t);
17     ///
18     void close();
19 }
20 
21 alias ConnectionMaker = Connection function() @safe;
22 /// std.socket transport
23 class SocketConnection : Connection {
24     import std.socket;
25     private {
26         string  _host;
27         ushort  _port;
28         Socket  _socket;
29     }
30 
31     this() @safe {
32         _socket = new Socket(AddressFamily.INET, SocketType.STREAM);
33         _socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
34 
35     }
36 
37     override void connect(URL url) @safe {
38         string host = url.host;
39         ushort port = url.port;
40         auto addr = new InternetAddress(host, port);
41         _socket.connect(addr);
42     }
43 
44     override immutable(ubyte)[] recv(size_t to_receive) {
45         immutable(ubyte)[] result;
46         result.length = to_receive;
47         auto r = _socket.receive(cast(void[])result);
48         if ( r <= 0 ) {
49             return result[0..0];
50         }
51         return result[0..r];
52     }
53 
54     override size_t send(immutable(ubyte)[] data) {
55         return _socket.send(cast(void[])data, SocketFlags.NONE);
56     }
57 
58     override void close() {
59         _socket.close();
60     }
61 
62 }
63 /// std.socket connection builder
64 Connection stdConnectionMaker() @safe {
65     return new SocketConnection();
66 }
67 
68 version(vibe) {
69     import vibe.vibe;
70     import vibe.core.net;
71     import eventcore.core;
72     import std.exception;
73     import std.socket;
74 
75     /// vibe-d connection builder
76     Connection vibeConnectionMaker() @safe {
77         return new VibeSocketConnection();
78     }
79     /// vibe-d transport
80     class VibeSocketConnection: Connection {
81         private {
82             TCPConnection _socket;
83         }
84         ///
85         this() @safe {
86         }
87         override void connect(URL url) {
88             auto a = getAddressInfo(url.host, AddressFamily.INET);
89             _socket = connectTCP(a[0].address.toAddrString, url.port);
90         }
91 
92         override immutable(ubyte)[] recv(size_t to_receive) {
93             ubyte[] result;
94             result.length = to_receive;
95             auto r = _socket.read(result, IOMode.once);
96             if (r <= 0) {
97                 return assumeUnique(result[0 .. 0]);
98             }
99             return assumeUnique(result[0..r]);
100         }
101 
102         override size_t send(immutable(ubyte)[] data) {
103             try {
104                 auto r = _socket.write(data, IOMode.all);
105                 return r;
106             } catch (Exception e) {
107                 logError(e.toString);
108                 return -1;
109             }
110         }
111 
112         override void close() {
113             _socket.close();
114         }
115     }
116 }
117 
118 version(hio) {
119     import std.datetime;
120     import std.socket;
121     import std.format;
122     import hio.socket;
123     import hio.events;
124     /// hio connection builder
125     Connection hioConnectionMaker() @safe {
126         return new HioSocketConnection();
127     }
128     /// hio transport
129     class HioSocketConnection : Connection {
130         private {
131             HioSocket   _socket;
132         }
133 
134         this() @safe {
135             _socket = new HioSocket();
136         }
137 
138         override void connect(URL url) {
139             auto a = getAddressInfo(url.host, AddressFamily.INET);
140             _socket.connect("%s:%d".format(a[0].address.toAddrString, url.port), 1.seconds);
141         }
142 
143         override immutable(ubyte)[] recv(size_t to_receive) {
144             immutable(ubyte)[] result;
145             result.length = to_receive;
146             IOResult r = _socket.recv(to_receive);
147             if ( r.error || r.input.length == 0) {
148                 return result[0..0];
149             }
150             return r.input;
151         }
152 
153         override size_t send(immutable(ubyte)[] data) {
154             return _socket.send(data, 1.seconds);
155         }
156 
157         override void close() {
158             _socket.close();
159         }
160     }
161 }