Compare commits

..

3 Commits

Author SHA1 Message Date
rony5394
87d8388810 feat: some safety checks 2026-04-18 16:35:45 +02:00
rony5394
d0d5168fd2 Some logging added.
This change is untested!
2026-04-13 15:31:06 +02:00
rony5394
4a33358596 Saving to storage.
New buildfile.
Storing backups in storage.
2026-04-09 15:32:54 +02:00
11 changed files with 264 additions and 48 deletions

View File

@@ -1,5 +1,6 @@
FROM docker.io/library/golang:1.25.5-alpine AS builder FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.25.5-alpine AS builder
ARG TARGETARCH
WORKDIR /build WORKDIR /build
COPY go.mod go.sum ./ COPY go.mod go.sum ./
@@ -8,13 +9,13 @@ RUN go mod download
COPY . /build COPY . /build
RUN CGO_ENABLED=0 GOOS=linux go build -o /blazena RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH go build -o /blazena
FROM docker.io/library/alpine:3.3 FROM docker.io/library/alpine:3.23
RUN apk add openssh rsync --no-cache RUN apk add openssh rsync btrfs-progs --no-cache
COPY --from=builder /blazena / COPY --from=builder --chmod=+x /blazena /
EXPOSE 1234 EXPOSE 1234
ENV MODE=invalid ENV MODE=invalid

View File

@@ -3,6 +3,7 @@ package config;
import ( import (
"os" "os"
"encoding/json" "encoding/json"
"errors"
); );
type Config struct { type Config struct {
@@ -26,14 +27,15 @@ type RegistryAuth struct {
Password string Password string
} }
func GetConfig() Config { func GetConfig()(Config, error){
var cfg Config; var cfg Config;
rawConfig, err := os.ReadFile("./config.json"); rawConfig, err := os.ReadFile("/config.json");
if err != nil{ if err != nil{
panic("Failed it load config file." + err.Error()); return cfg, errors.New("Failed it load config file." + err.Error());
} }
// Set defaults
cfg.Constants.OverlayNetworkName = "blazenaPohar"; cfg.Constants.OverlayNetworkName = "blazenaPohar";
cfg.Constants.HelperServiceName = "blazenaHelper"; cfg.Constants.HelperServiceName = "blazenaHelper";
cfg.Constants.StorageContainerName = "blazenaStorage"; cfg.Constants.StorageContainerName = "blazenaStorage";
@@ -42,8 +44,8 @@ func GetConfig() Config {
err = json.Unmarshal(rawConfig, &cfg); err = json.Unmarshal(rawConfig, &cfg);
if err != nil{ if err != nil{
panic("Failed to unmarshal config." + err.Error()) return cfg, errors.New("Failed to unmarshal config: " + err.Error());
} }
return cfg; return cfg, err;
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog"
"os" "os"
"os/signal" "os/signal"
"strings" "strings"
@@ -13,6 +14,7 @@ import (
"net/http" "net/http"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client" "github.com/docker/docker/client"
@@ -32,22 +34,27 @@ type aService struct{
} }
func Run(Config cfg.Config){ func Run(Config cfg.Config){
theConfig = Config; // Before touching the line below think.
theConfig = Config;
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM); ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM);
var err error; var err error;
ApiClient, err = client.NewClientWithOpts(client.FromEnv); ApiClient, err = client.NewClientWithOpts(client.FromEnv);
if(err != nil){ if(err != nil){
panic("Docker client was not able to init from env!" + err.Error()); slog.Error("Failed to create docker client!", slog.String("note", "Try to look into DOCKER_HOST env var or check if socket exists and works"));
os.Exit(1);
} }
info, err := ApiClient.Info(context.Background()) info, err := ApiClient.Info(context.Background())
if(err != nil){ if(err != nil){
panic("Error getting info!" + err.Error()); slog.Error("Error getting info from docker socket!", slog.String("note", "This is kind of ping."));
os.Exit(1);
} }
if(!info.Swarm.ControlAvailable){ if(!info.Swarm.ControlAvailable){
panic("Node is not a swarm manager."); slog.Error("This node is not a swarm manager!");
os.Exit(1);
} }
server := &http.Server{ server := &http.Server{
@@ -68,7 +75,7 @@ func Run(Config cfg.Config){
stop(); stop();
}); });
ApiClient.NetworkCreate(context.Background(), theConfig.Constants.OverlayNetworkName, network.CreateOptions{ ApiClient.NetworkCreate(context.Background(), Config.Constants.OverlayNetworkName, network.CreateOptions{
Attachable: true, Attachable: true,
// Internal: true, // Internal: true,
Driver: "overlay", Driver: "overlay",
@@ -84,7 +91,8 @@ func Run(Config cfg.Config){
} }
if(err != nil){ if(err != nil){
panic("Unable to start http server!" + err.Error()); slog.Error("Unable to start http server!", slog.Any("propagatedError", err));
os.Exit(1);
} }
}(); }();
@@ -96,7 +104,7 @@ func Run(Config cfg.Config){
fmt.Println("Stopping http server."); fmt.Println("Stopping http server.");
server.Close(); server.Close();
ApiClient.NetworkRemove(context.Background(), theConfig.Constants.OverlayNetworkName); ApiClient.NetworkRemove(context.Background(), Config.Constants.OverlayNetworkName);
ApiClient.ConfigRemove(context.Background(), "blazenaSSHPublicKey") ApiClient.ConfigRemove(context.Background(), "blazenaSSHPublicKey")
ApiClient.SecretRemove(context.Background(), "blazenaSSHHostPrivateKey"); ApiClient.SecretRemove(context.Background(), "blazenaSSHHostPrivateKey");
@@ -110,6 +118,7 @@ func bearerAuth(w http.ResponseWriter, r *http.Request)bool {
if authHeader != expected { if authHeader != expected {
w.WriteHeader(http.StatusUnauthorized) w.WriteHeader(http.StatusUnauthorized)
fmt.Fprintln(w, "Unauthorized") fmt.Fprintln(w, "Unauthorized")
slog.Warn("Unauthorized request received", slog.Any("request", *r));
return false; return false;
} }
return true; return true;
@@ -131,6 +140,17 @@ func listServices(w http.ResponseWriter, r *http.Request){
var services []aService; var services []aService;
nodes, err := ApiClient.NodeList(context.Background(), swarm.NodeListOptions{});
var validNodeHostnames []string;
for _, node := range nodes{
validNodeHostnames = append(validNodeHostnames, node.Description.Hostname);
}
SERVICES:
for _, service := range list{ for _, service := range list{
var settings map[string]string = service.Spec.Labels; var settings map[string]string = service.Spec.Labels;
@@ -141,6 +161,30 @@ func listServices(w http.ResponseWriter, r *http.Request){
targetVolumes := strings.Split(settings["blazena.volumes"], ","); targetVolumes := strings.Split(settings["blazena.volumes"], ",");
var validVolumeNames []string;
for _, mnt := range service.Spec.TaskTemplate.ContainerSpec.Mounts{
if mnt.Type != mount.TypeVolume {
continue
}
validVolumeNames = append(validVolumeNames, mnt.Source);
}
if !contains(validNodeHostnames, settings["blazena.node"]) {
errMsg := "Node with hostname:'"+ settings["blazena.node"] +"' does not exist.";
slog.Warn("Invalid Service Config!", slog.String("serviceId", service.ID), slog.String("errMessage", errMsg));
continue SERVICES;
}
for _, volume := range targetVolumes{
if contains(validVolumeNames, volume){
continue;
}
errMsg := "Volume name '"+ volume + "' is not in the service spec!";
slog.Warn("Invalid Service Config!", slog.String("serviceId", service.ID), slog.String("errMessage", errMsg));
continue SERVICES;
}
services = append(services, aService{ services = append(services, aService{
ServiceId: service.ID, ServiceId: service.ID,
VolumeNames: targetVolumes, VolumeNames: targetVolumes,
@@ -152,7 +196,8 @@ func listServices(w http.ResponseWriter, r *http.Request){
bytes, err := json.Marshal(services); bytes, err := json.Marshal(services);
if err != nil{ if err != nil{
panic("Error during response encoding!" + err.Error()); slog.Error("Error during response encoding!", slog.Any("propagatedError", err));
os.Exit(1);
} }
fmt.Fprint(w, string(bytes)); fmt.Fprint(w, string(bytes));

View File

@@ -2,17 +2,17 @@ package docker
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"encoding/base64"
"time" "time"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client" "github.com/docker/docker/client"
cfg "github.com/rony5394/blazena/config" cfg "github.com/rony5394/blazena/config"

1
go.mod
View File

@@ -19,6 +19,7 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/atomicwriter v0.1.0 // indirect github.com/moby/sys/atomicwriter v0.1.0 // indirect
github.com/moby/term v0.5.2 // indirect github.com/moby/term v0.5.2 // indirect

View File

@@ -1,4 +0,0 @@
FROM alpine:3
RUN mkdir -p /root/.ssh/
RUN apk add openssh rsync --no-cache

View File

@@ -52,7 +52,6 @@ func createStorageContainer(Config cfg.Config, DockerClient *client.Client, sshS
Type: mount.TypeBind, Type: mount.TypeBind,
Source: Config.LocalBasePath, Source: Config.LocalBasePath,
Target: "/volume", Target: "/volume",
ReadOnly: true,
}, },
}, },
//AutoRemove: true, //AutoRemove: true,

View File

@@ -1,10 +1,13 @@
package host package host
import ( import (
"bytes"
"encoding/json" "encoding/json"
"io" "io"
"log/slog"
"net/http" "net/http"
"bytes" "os"
cfg "github.com/rony5394/blazena/config" cfg "github.com/rony5394/blazena/config"
); );
@@ -19,12 +22,15 @@ func exchangeKeys(Config cfg.Config, sshKeyPem string)string{
if err != nil { if err != nil {
panic("Failed to marshal body."+ err.Error()); panic("Failed to marshal body."+ err.Error());
slog.Error("Failed to marshal body.", slog.Any("propagatedError", err), slog.String("note", "Input for this marshal operation is that ssh pk. So the kebab is going on!"))
os.Exit(42);
} }
rq, err := http.NewRequest("POST", Config.DockerManagerBaseUrl + "/keys", bytes.NewBuffer(bodyEncoded)); rq, err := http.NewRequest("POST", Config.DockerManagerBaseUrl + "/keys", bytes.NewBuffer(bodyEncoded));
if err != nil{ if err != nil{
panic("Failed to create http request"+ err.Error()); slog.Error("Failed to create request", slog.Any("propagatedError", err), slog.String("note", "not send just create the object"));
os.Exit(1);
} }
rq.Header.Set("Authorization", "Bearer "+ token); rq.Header.Set("Authorization", "Bearer "+ token);
@@ -32,7 +38,8 @@ func exchangeKeys(Config cfg.Config, sshKeyPem string)string{
rs, err := http.DefaultClient.Do(rq); rs, err := http.DefaultClient.Do(rq);
if err != nil{ if err != nil{
panic("Failed to send http request"+ err.Error()); slog.Error("Failed to send http request", slog.Any("propagatedError", err));
os.Exit(1);
} }
defer rs.Body.Close(); defer rs.Body.Close();
@@ -40,7 +47,8 @@ func exchangeKeys(Config cfg.Config, sshKeyPem string)string{
rsBodyRaw, err := io.ReadAll(rs.Body); rsBodyRaw, err := io.ReadAll(rs.Body);
if err != nil{ if err != nil{
panic("Failed to read response's body!"+err.Error()); slog.Error("Failed to read response body!" , slog.Any("propagatedError", err));
os.Exit(1);
} }
var rsBody struct{ var rsBody struct{
@@ -49,7 +57,8 @@ func exchangeKeys(Config cfg.Config, sshKeyPem string)string{
err = json.Unmarshal(rsBodyRaw, &rsBody); err = json.Unmarshal(rsBodyRaw, &rsBody);
if err != nil{ if err != nil{
panic("Failed to unmarshal rsBodyRaw!"+ err.Error()); slog.Error("Failed to unmarshal rsBodyRaw!", slog.Any("propagatedError", err));
os.Exit(1);
} }

View File

@@ -5,8 +5,10 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"log/slog"
"net/http" "net/http"
"os" "os"
"time" "time"
@@ -43,17 +45,24 @@ func Run(Config cfg.Config) {
services := getServices(Config); services := getServices(Config);
for _, service := range services { for _, service := range services {
fmt.Println("Scaling Down: "+service.ServiceId)
scale(Config, service.ServiceId, false);
fmt.Println("Done!");
for _, volume := range service.VolumeNames{
fmt.Println("Preparing: " + service.ServiceId + " volume: " + volume);
if !prepareService(Config, service, volume) {continue}
fmt.Println("Done!");
// Skiping Host Key Check is temporary. slog.Info("Scaling Down", slog.String("serviceId", service.ServiceId));
scale(Config, service.ServiceId, false);
slog.Info("Done");
for _, volume := range service.VolumeNames{
slog.Info("Preparing", slog.String("serviceId", service.ServiceId), slog.String("volumeId", volume));
if !prepareService(Config, service, volume) {continue}
slog.Info("Done");
targetStoragePath, _ := generateStoragePath(Config, service.Node, volume, DockerClient);
sourceStoragePath := "root@tasks."+ Config.Constants.HelperServiceName +":/volume";
slog.Debug("targetStoragePath", slog.String("value", targetStoragePath), slog.String("serviceId", service.ServiceId));
slog.Debug("sourceStoragePath", slog.String("value", sourceStoragePath), slog.String("serviceId", service.ServiceId));
command := `rsync -avz --delete -e "ssh -i /ssh-key -p 2222 -o StrictHostKeyChecking=yes -o UserKnownHostsFile=/expected-host-key" \ command := `rsync -avz --delete -e "ssh -i /ssh-key -p 2222 -o StrictHostKeyChecking=yes -o UserKnownHostsFile=/expected-host-key" \
root@tasks.`+ Config.Constants.HelperServiceName +`:/volume/ /tmp/` + volume; `+ sourceStoragePath +" "+ targetStoragePath;
exec, err := DockerClient.ContainerExecCreate(context.Background(), Config.Constants.StorageContainerName, container.ExecOptions{ exec, err := DockerClient.ContainerExecCreate(context.Background(), Config.Constants.StorageContainerName, container.ExecOptions{
Cmd: []string{"sh", "-c", command}, Cmd: []string{"sh", "-c", command},
@@ -62,30 +71,39 @@ func Run(Config cfg.Config) {
Tty: false, Tty: false,
}); });
if err != nil { if err != nil {
panic("Failed to create rsync exec!"+err.Error()); slog.Error("Failed to create rsync exec!", slog.Any("propagatedError", err));
os.Exit(1);
} }
resp, err := DockerClient.ContainerExecAttach(context.Background(), exec.ID, container.ExecStartOptions{}); resp, err := DockerClient.ContainerExecAttach(context.Background(), exec.ID, container.ExecStartOptions{});
if err != nil {
slog.Error("Failed to create container exec!", slog.Any("propagatedError", err));
}
defer resp.Close(); defer resp.Close();
io.Copy(os.Stdout, resp.Reader) io.Copy(os.Stdout, resp.Reader)
time.Sleep(30*time.Second); time.Sleep(30*time.Second);
fmt.Println("Cleaning Up: " + service.ServiceId); slog.Info("Cleaning Up", slog.String("serviceId", service.ServiceId), slog.String("volumeId", volume));
cleanupService(Config, service); cleanupService(Config, service);
fmt.Println("Done!"); slog.Info("Done!");
} }
fmt.Println("Scaling up: "+service.ServiceId); slog.Info("Scaling Up", slog.String("serviceId", service.ServiceId));
scale(Config, service.ServiceId, true); scale(Config, service.ServiceId, true);
fmt.Println("Done!"); slog.Info("Done!");
} }
DockerClient.ContainerRemove(context.Background(), Config.Constants.StorageContainerName, container.RemoveOptions{ DockerClient.ContainerRemove(context.Background(), Config.Constants.StorageContainerName, container.RemoveOptions{
Force: true, Force: true,
}); });
if !shutdown(Config){panic("Failed to shutdown docker api!");} if !shutdown(Config){
slog.Error("Failed to shutdown docker api!");
os.Exit(1);
}
slog.Info("Finished whole backup run.");
} }
func getServices(Config cfg.Config)[]aService{ func getServices(Config cfg.Config)[]aService{
@@ -185,3 +203,95 @@ func addToTar(tw *tar.Writer, filename string, content string) error{
return err; return err;
} }
func createIfMissing(targetPath string, DockerClient *client.Client, cfg cfg.Config) error{
const cmd = `#!/bin/sh
set -e
TARGET_PATH=$1
# Remove trailing slash
TARGET_PATH=${TARGET_PATH%/}
CURRENT=""
case "$TARGET_PATH" in
/*) CURRENT="/" ;;
esac
OLD_IFS=$IFS
IFS='/'
for PART in $TARGET_PATH; do
[ -z "$PART" ] && continue
if [ "$CURRENT" = "/" ]; then
NEXT="${CURRENT}${PART}"
else
NEXT="${CURRENT}/${PART}"
fi
if [ ! -e "$NEXT" ]; then
case "$PART" in
@*)
echo "Creating Btrfs subvolume: $NEXT"
btrfs subvolume create "$NEXT"
;;
*)
echo "Creating directory: $NEXT"
mkdir "$NEXT"
;;
esac
else
echo "Already exists: $NEXT"
fi
CURRENT="$NEXT"
done
IFS=$OLD_IFS`;
exec, err := DockerClient.ContainerExecCreate(context.Background(), cfg.Constants.StorageContainerName, container.ExecOptions{
Cmd: []string{"sh", "-c", cmd, "_", targetPath},
AttachStdout: true,
AttachStderr: true,
Tty: false,
});
if err != nil {
panic("Failed to create execute!"+err.Error());
}
resp, err := DockerClient.ContainerExecAttach(context.Background(), exec.ID, container.ExecStartOptions{});
defer resp.Close();
if err != nil {
panic("Failed to atach to exec!"+err.Error());
}
inspect, err := DockerClient.ContainerExecInspect(context.Background(), exec.ID);
if(inspect.ExitCode != 0){
fmt.Println("<resp>");
io.Copy(os.Stdout, resp.Reader);
fmt.Println("</resp>");
return errors.New("Execution did return non zero code!");
}
return nil;
}
func generateStoragePath(cfg cfg.Config, node string, volumeId string, DockerClient *client.Client) (string, error){
var path string;
path += "/volume";
path += "/@"+ node +"/@"+ volumeId;
err := createIfMissing(path, DockerClient, cfg);
if err != nil {
return "", err;
}
return path, nil;
}

27
main.go
View File

@@ -5,14 +5,39 @@ import (
"github.com/rony5394/blazena/docker" "github.com/rony5394/blazena/docker"
"github.com/rony5394/blazena/host" "github.com/rony5394/blazena/host"
cfg "github.com/rony5394/blazena/config" cfg "github.com/rony5394/blazena/config"
"log/slog"
); );
/*
If the exit code is X then it means Y:
| X | Y |
|----|------------------------------------------------------------------------------------------|
| 0 | Everything should be good, normal exit. |
| 1 | Some common error, but that still mean it is going to crash. |
| 3 | Ask yourself if you are not using dev version in prod. If not then spam the developer. |
| 42 | WHAT THE ACTUAL ***** IS HAPPENING. or assume something is very wrong in the app itself. |
| 69 | [INSERT HERE] |
*/
func main() { func main() {
if(len(os.Args) < 2){ if(len(os.Args) < 2){
panic("Usage: blazena <mode>"); panic("Usage: blazena <mode>");
} }
var config = cfg.GetConfig(); logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}));
slog.SetDefault(logger);
config, err := cfg.GetConfig();
if(err != nil){
slog.Error("Failed to load config!", slog.Any("propagatedError", err.Error()));
os.Exit(1);
}
slog.Debug("Config", slog.Any("Value", config));
mode := os.Args[1]; mode := os.Args[1];
switch mode { switch mode {

28
shared/trace.go Normal file
View File

@@ -0,0 +1,28 @@
package shared
import (
"log/slog"
"github.com/google/uuid"
);
func NewTraceId()string{
return uuid.New().String();
}
func helper(name string, id *string)slog.Attr{
if id == nil{
return slog.String(name, NewTraceId());
}
return slog.String(name, *id);
}
func NewSlogTrace(id *string)slog.Attr{
return helper("trace", id);
}
func NewSlogOperation(id *string)slog.Attr{
return helper("operation", id);
}