1 /// 2 module redisd.client; 3 4 import std.typecons; 5 import std.stdio; 6 import std.algorithm; 7 import std.range; 8 import std..string; 9 10 import std.meta: allSatisfy; 11 import std.traits: isSomeString; 12 13 import std.experimental.logger; 14 15 import redisd.connection; 16 import redisd.codec; 17 18 import url: URL, parseURL; 19 20 private immutable bufferSize = 4*1024; 21 22 /// 23 class NotAuthenticated : Exception { 24 /// 25 this(string msg) { 26 super(msg); 27 } 28 } 29 /// client API 30 class Client { 31 32 private { 33 URL _url; 34 ConnectionMaker _connection_maker; 35 Connection _connection; 36 Decoder _input_stream; 37 } 38 /// Constructor 39 this(string url="redis://localhost:6379", ConnectionMaker connectionMaker=&stdConnectionMaker) { 40 _url = parseURL(url); 41 _connection_maker = connectionMaker; 42 _connection = _connection_maker(); 43 _connection.connect(_url); 44 _input_stream = new Decoder(); 45 if ( _url.pass ) { 46 auto v = execCommand("AUTH", _url.pass); 47 if ( v.svar != "OK" ) { 48 throw new NotAuthenticated("Can't authenticate"); 49 } 50 } 51 } 52 53 private void reconnect() { 54 _connection = _connection_maker(); 55 _connection.connect(_url); 56 _input_stream = new Decoder; 57 if (_url.pass) { 58 auto auth = execCommand("AUTH", _url.pass); 59 if (auth.svar != "OK") { 60 throw new NotAuthenticated("Can't authenticate"); 61 } 62 } 63 } 64 /// Build redis command from command name and args. 65 /// All args must be of type string. 66 RedisdValue makeCommand(A...)(A args) { 67 static assert(allSatisfy!(isSomeString, A), "all command parameters must be of type string"); 68 return redisdValue(tuple(args)); 69 } 70 /// Build and execute redis transaction from command array. 71 RedisdValue transaction(RedisdValue[] commands) { 72 RedisdValue[] results; 73 RedisdValue r = this.execCommand("MULTI"); 74 foreach (c; commands) { 75 exec(c); 76 } 77 r = this.execCommand("EXEC"); 78 return r; 79 } 80 /// build and execute redis pipeline from commands array. 81 RedisdValue[] pipeline(RedisdValue[] commands) { 82 immutable(ubyte)[] data = commands.map!encode.join(); 83 _connection.send(data); 84 RedisdValue[] response; 85 while (response.length < commands.length) { 86 debug(redisd) tracef("response length=%d, commands.length=%d", response.length, commands.length); 87 auto b = _connection.recv(bufferSize); 88 if (b.length == 0) { 89 break; 90 } 91 _input_stream.put(b); 92 while(true) { 93 auto v = _input_stream.get(); 94 if (v.type == ValueType.Incomplete) { 95 break; 96 } 97 response ~= v; 98 if (v.type == ValueType.Error 99 && cast(string) v.svar[4 .. 18] == "Protocol error") { 100 debug (redisd) 101 trace("reopen connection"); 102 _connection.close(); 103 reconnect(); 104 return response; 105 } 106 } 107 } 108 return response; 109 } 110 111 private RedisdValue exec(RedisdValue command) { 112 RedisdValue response; 113 _connection.send(command.encode); 114 while (true) { 115 auto b = _connection.recv(bufferSize); 116 if (b.length == 0) { 117 break; 118 } 119 _input_stream.put(b); 120 response = _input_stream.get(); 121 if (response.type != ValueType.Incomplete) { 122 break; 123 } 124 } 125 if (response.type == ValueType.Error && response.svar[4 .. 18] == "Protocol error") { 126 _connection.close(); 127 debug (redisd) 128 trace("reopen connection"); 129 reconnect(); 130 } 131 if (response.type == ValueType.Error && response.svar[0..6] == "NOAUTH" ) { 132 throw new NotAuthenticated("Auth required"); 133 } 134 return response; 135 } 136 /// build and execute single redis command. 137 RedisdValue execCommand(A...)(A args) { 138 immutable(ubyte)[][] data; 139 RedisdValue request = makeCommand(args); 140 RedisdValue response; 141 _connection.send(request.encode); 142 while(true) { 143 auto b = _connection.recv(bufferSize); 144 if ( b.length == 0 ) { 145 // error, timeout or something bad 146 break; 147 } 148 _input_stream.put(b); 149 response = _input_stream.get(); 150 if (response.type != ValueType.Incomplete) { 151 break; 152 } 153 } 154 if ( response.type == ValueType.Error && 155 cast(string)response.svar[4..18] == "Protocol error") { 156 _connection.close(); 157 debug(redisd) trace("reopen connection"); 158 reconnect(); 159 } 160 if (response.type == ValueType.Error && response.svar[0 .. 6] == "NOAUTH") { 161 throw new NotAuthenticated("Auth required"); 162 } 163 return response; 164 } 165 /// Simple key/value set 166 RedisdValue set(K, V)(K k, V v) { 167 return execCommand("SET", k, v); 168 } 169 /// Simple key/value get 170 RedisdValue get(K)(K k) { 171 return execCommand("GET", k); 172 } 173 /// Consume reply 174 RedisdValue read() { 175 RedisdValue response; 176 response = _input_stream.get(); 177 while(response.type == ValueType.Incomplete) { 178 auto b = _connection.recv(bufferSize); 179 if (b.length == 0) { 180 break; 181 } 182 _input_stream.put(b); 183 response = _input_stream.get(); 184 if (response.type != ValueType.Incomplete) { 185 break; 186 } 187 } 188 if (response.type == ValueType.Error && cast(string) response.svar[4 .. 18] 189 == "Protocol error") { 190 _connection.close(); 191 debug (redisd) 192 trace("reopen connection"); 193 reconnect(); 194 } 195 return response; 196 } 197 198 void close() { 199 _connection.close(); 200 } 201 }